package main import ( "context" "encoding/json" "fmt" "log/slog" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/joho/godotenv" "hub/internal/weather" ) type PublisherConfig struct { // FMI API FMIEndpoint string `env:"FMI_ENDPOINT"` // Observation StationID int `env:"STATION_ID"` ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` ObsTimestep int `env:"OBS_TIMESTEP"` WatermarkFile string `env:"WATERMARK_FILE"` // Forecast FcPollEvery time.Duration `env:"FC_POLL_EVERY"` HelLat float64 `env:"HEL_LAT"` HelLon float64 `env:"HEL_LON"` // MQTT MQTTBroker string `env:"MQTT_BROKER"` MQTTClientID string `env:"MQTT_CLIENT_ID"` MQTTUsername string `env:"MQTT_USERNAME"` MQTTPassword string `env:"MQTT_PASSWORD"` MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` // Application LogLevel string `env:"LOG_LEVEL"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` } func loadPublisherConfig() *PublisherConfig { _ = godotenv.Load() return &PublisherConfig{ FMIEndpoint: weather.GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), StationID: weather.ParseInt(weather.GetEnv("STATION_ID", "100968")), ObsPollEvery: weather.ParseDuration(weather.GetEnv("OBS_POLL_EVERY", "5m")), ObsSafetyLag: weather.ParseDuration(weather.GetEnv("OBS_SAFETY_LAG", "2m")), ObsTimestep: weather.ParseInt(weather.GetEnv("OBS_TIMESTEP", "60")), WatermarkFile: weather.GetEnv("WATERMARK_FILE", "obs_watermark.txt"), FcPollEvery: weather.ParseDuration(weather.GetEnv("FC_POLL_EVERY", "1h")), HelLat: weather.ParseFloat(weather.GetEnv("HEL_LAT", "60.1699")), HelLon: weather.ParseFloat(weather.GetEnv("HEL_LON", "24.9384")), MQTTBroker: weather.GetEnv("MQTT_BROKER", "tcp://localhost:1883"), MQTTClientID: weather.GetEnv("MQTT_CLIENT_ID", "fmi-publisher"), MQTTUsername: weather.GetEnv("MQTT_USERNAME", ""), MQTTPassword: weather.GetEnv("MQTT_PASSWORD", ""), MQTTKeepAlive: weather.ParseDuration(weather.GetEnv("MQTT_KEEP_ALIVE", "60s")), MQTTSessionExp: weather.ParseDuration(weather.GetEnv("MQTT_SESSION_EXP", "1h")), LogLevel: weather.GetEnv("LOG_LEVEL", "info"), HTTPTimeout: weather.ParseDuration(weather.GetEnv("HTTP_TIMEOUT", "30s")), } } func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } return time.Unix(sec, 0).UTC() } func storeWatermark(filename string, t time.Time) error { data := []byte(fmt.Sprintf("%d", t.Unix())) return os.WriteFile(filename, data, 0644) } func publishObservation(cm *autopaho.ConnectionManager, obs weather.Observation, logger *slog.Logger) error { topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) // Safe JSON structure type SafeObservation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value *weather.JSONFloat64 `json:"value,omitempty"` } var safeValue *weather.JSONFloat64 if obs.Value != nil { v := weather.JSONFloat64(*obs.Value) safeValue = &v } safeObs := SafeObservation{ Station: obs.Station, Parameter: obs.Parameter, Time: obs.Time, Value: safeValue, } b, err := json.Marshal(safeObs) if err != nil { return err } // MQTT v5 publish properties props := &paho.PublishProperties{ User: []paho.UserProperty{ {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, {Key: "parameter", Value: obs.Parameter}, {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, {Key: "data_type", Value: "observation"}, {Key: "source", Value: "fmi"}, }, } pb := &paho.Publish{ Topic: topic, QoS: 1, Retain: false, Payload: b, Properties: props, } // Publish with context timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = cm.Publish(ctx, pb) if err != nil { return fmt.Errorf("failed to publish observation: %w", err) } logger.Debug("Published observation", "topic", topic, "station", obs.Station, "parameter", obs.Parameter, "value", obs.Value, "time", obs.Time) return nil } func publishForecast(cm *autopaho.ConnectionManager, fc weather.ForecastValue, logger *slog.Logger) error { topic := fmt.Sprintf( "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", fc.RunTime.Format(time.RFC3339), fc.Parameter, ) // Safe JSON structure type SafeForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value *weather.JSONFloat64 `json:"value,omitempty"` } var safeValue *weather.JSONFloat64 if fc.Value != nil { v := weather.JSONFloat64(*fc.Value) safeValue = &v } safeFc := SafeForecastValue{ Location: fc.Location, Model: fc.Model, RunTime: fc.RunTime, ForecastTime: fc.ForecastTime, Parameter: fc.Parameter, Value: safeValue, } b, err := json.Marshal(safeFc) if err != nil { return err } // MQTT v5 publish properties props := &paho.PublishProperties{ User: []paho.UserProperty{ {Key: "model", Value: fc.Model}, {Key: "parameter", Value: fc.Parameter}, {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, {Key: "data_type", Value: "forecast"}, {Key: "source", Value: "fmi"}, {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, }, } pb := &paho.Publish{ Topic: topic, QoS: 1, Retain: true, // Forecasts are retained since they don't change Payload: b, Properties: props, } // Publish with context timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = cm.Publish(ctx, pb) if err != nil { return fmt.Errorf("failed to publish forecast: %w", err) } logger.Debug("Published forecast", "topic", topic, "parameter", fc.Parameter, "run_time", fc.RunTime, "forecast_time", fc.ForecastTime) return nil } func runPublisher(ctx context.Context, cfg *PublisherConfig, logger *slog.Logger) error { // Load initial state lastObs := loadWatermark(cfg.WatermarkFile) lastFcRun := time.Time{} // Create MQTT client mqttCfg := weather.MQTTConfig{ Broker: cfg.MQTTBroker, ClientID: cfg.MQTTClientID, Username: cfg.MQTTUsername, Password: cfg.MQTTPassword, KeepAlive: cfg.MQTTKeepAlive, SessionExpiry: cfg.MQTTSessionExp, Timeout: cfg.HTTPTimeout, } mqttClient, err := weather.CreateMQTTClient(ctx, mqttCfg, func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { logger.Info("MQTT v5 connected", "session_present", connAck.SessionPresent, "keep_alive", cfg.MQTTKeepAlive) }) if err != nil { return fmt.Errorf("failed to connect to MQTT: %w", err) } defer func() { discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := mqttClient.Disconnect(discCtx); err != nil { logger.Error("Failed to disconnect MQTT", "error", err) } }() // Create timers obsTimer := time.NewTimer(0) // Start immediately fcTimer := time.NewTimer(0) // Start immediately defer obsTimer.Stop() defer fcTimer.Stop() logger.Info("Poller started", "obs_interval", cfg.ObsPollEvery, "fc_interval", cfg.FcPollEvery, "mqtt_broker", cfg.MQTTBroker) for { select { case <-ctx.Done(): logger.Info("Poller stopping") return nil case <-obsTimer.C: // Poll observations obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) if obsEnd.After(lastObs) { logger.Info("Polling observations", "from", lastObs, "to", obsEnd) url := weather.BuildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) data, err := weather.FetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch observations", "error", err) } else { obs, maxT, err := weather.ParseObservations(data, cfg.StationID, lastObs) if err != nil { logger.Error("Failed to parse observations", "error", err) } else if len(obs) > 0 { // Publish observations successCount := 0 for _, o := range obs { if err := publishObservation(mqttClient, o, logger); err != nil { logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) } else { successCount++ } } // Update watermark lastObs = maxT if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { logger.Error("Failed to store watermark", "error", err) } logger.Info("Published observations", "successful", successCount, "total", len(obs), "new_watermark", lastObs) } else { logger.Debug("No new observations found") } } } obsTimer.Reset(cfg.ObsPollEvery) case <-fcTimer.C: // Poll forecast now := time.Now().UTC() start := now end := now.Add(48 * time.Hour) logger.Info("Polling forecast", "from", start, "to", end) url := weather.BuildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) data, err := weather.FetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch forecast", "error", err) } else { fc, runTime, err := weather.ParseForecast(data, cfg.HelLat, cfg.HelLon) if err != nil { logger.Error("Failed to parse forecast", "error", err) } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { // Publish forecast successCount := 0 for _, f := range fc { if err := publishForecast(mqttClient, f, logger); err != nil { logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) } else { successCount++ } } lastFcRun = runTime logger.Info("Published forecast", "successful", successCount, "total", len(fc), "run_time", runTime) } else { logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) } } fcTimer.Reset(cfg.FcPollEvery) } } } func main() { cfg := loadPublisherConfig() // Setup logging logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: parseLogLevel(cfg.LogLevel), })) // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Setup signal handling sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigChan logger.Info("Received signal", "signal", sig) cancel() }() // Run the publisher logger.Info("Starting weather data publisher with MQTT v5") if err := runPublisher(ctx, cfg, logger); err != nil { logger.Error("Publisher failed", "error", err) os.Exit(1) } logger.Info("Publisher stopped gracefully") } func parseLogLevel(level string) slog.Level { switch strings.ToLower(level) { case "debug": return slog.LevelDebug case "warn", "warning": return slog.LevelWarn case "error": return slog.LevelError default: return slog.LevelInfo } }