diff options
| author | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-04 11:52:47 +0200 |
|---|---|---|
| committer | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-04 11:52:47 +0200 |
| commit | 59491201976316a30ffc475dd99b0af02b5e997d (patch) | |
| tree | ecf395594d5d289d855eba16f786e0fb66c1d814 /cmd/publisher | |
| parent | 4e0ca0509c6b314eea8a7b2df6d093f5d9b7e70f (diff) | |
| download | pub-59491201976316a30ffc475dd99b0af02b5e997d.tar.zst | |
Both publisher and subscriber
Diffstat (limited to 'cmd/publisher')
| -rw-r--r-- | cmd/publisher/main.go | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/cmd/publisher/main.go b/cmd/publisher/main.go new file mode 100644 index 0000000..ab84616 --- /dev/null +++ b/cmd/publisher/main.go @@ -0,0 +1,417 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/joho/godotenv" + "hub/internal/weather" +) + +type PublisherConfig struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT"` + + // Observation + StationID int `env:"STATION_ID"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` + ObsTimestep int `env:"OBS_TIMESTEP"` + WatermarkFile string `env:"WATERMARK_FILE"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY"` + HelLat float64 `env:"HEL_LAT"` + HelLon float64 `env:"HEL_LON"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` +} + +func loadPublisherConfig() *PublisherConfig { + _ = godotenv.Load() + + return &PublisherConfig{ + FMIEndpoint: weather.GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: weather.ParseInt(weather.GetEnv("STATION_ID", "100968")), + ObsPollEvery: weather.ParseDuration(weather.GetEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: weather.ParseDuration(weather.GetEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: weather.ParseInt(weather.GetEnv("OBS_TIMESTEP", "60")), + WatermarkFile: weather.GetEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: weather.ParseDuration(weather.GetEnv("FC_POLL_EVERY", "1h")), + HelLat: weather.ParseFloat(weather.GetEnv("HEL_LAT", "60.1699")), + HelLon: weather.ParseFloat(weather.GetEnv("HEL_LON", "24.9384")), + MQTTBroker: weather.GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: weather.GetEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: weather.GetEnv("MQTT_USERNAME", ""), + MQTTPassword: weather.GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: weather.ParseDuration(weather.GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: weather.ParseDuration(weather.GetEnv("MQTT_SESSION_EXP", "1h")), + LogLevel: weather.GetEnv("LOG_LEVEL", "info"), + HTTPTimeout: weather.ParseDuration(weather.GetEnv("HTTP_TIMEOUT", "30s")), + } +} + +func loadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +func storeWatermark(filename string, t time.Time) error { + data := []byte(fmt.Sprintf("%d", t.Unix())) + return os.WriteFile(filename, data, 0644) +} + +func publishObservation(cm *autopaho.ConnectionManager, obs weather.Observation, logger *slog.Logger) error { + topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) + + // Safe JSON structure + type SafeObservation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value *weather.JSONFloat64 `json:"value,omitempty"` + } + + var safeValue *weather.JSONFloat64 + if obs.Value != nil { + v := weather.JSONFloat64(*obs.Value) + safeValue = &v + } + + safeObs := SafeObservation{ + Station: obs.Station, + Parameter: obs.Parameter, + Time: obs.Time, + Value: safeValue, + } + + b, err := json.Marshal(safeObs) + if err != nil { + return err + } + + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, + {Key: "parameter", Value: obs.Parameter}, + {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, + {Key: "data_type", Value: "observation"}, + {Key: "source", Value: "fmi"}, + }, + } + + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: false, + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish observation: %w", err) + } + + logger.Debug("Published observation", + "topic", topic, + "station", obs.Station, + "parameter", obs.Parameter, + "value", obs.Value, + "time", obs.Time) + + return nil +} + +func publishForecast(cm *autopaho.ConnectionManager, fc weather.ForecastValue, logger *slog.Logger) error { + topic := fmt.Sprintf( + "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", + fc.RunTime.Format(time.RFC3339), + fc.Parameter, + ) + + // Safe JSON structure + type SafeForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value *weather.JSONFloat64 `json:"value,omitempty"` + } + + var safeValue *weather.JSONFloat64 + if fc.Value != nil { + v := weather.JSONFloat64(*fc.Value) + safeValue = &v + } + + safeFc := SafeForecastValue{ + Location: fc.Location, + Model: fc.Model, + RunTime: fc.RunTime, + ForecastTime: fc.ForecastTime, + Parameter: fc.Parameter, + Value: safeValue, + } + + b, err := json.Marshal(safeFc) + if err != nil { + return err + } + + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "model", Value: fc.Model}, + {Key: "parameter", Value: fc.Parameter}, + {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, + {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, + {Key: "data_type", Value: "forecast"}, + {Key: "source", Value: "fmi"}, + {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, + {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, + }, + } + + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: true, // Forecasts are retained since they don't change + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish forecast: %w", err) + } + + logger.Debug("Published forecast", + "topic", topic, + "parameter", fc.Parameter, + "run_time", fc.RunTime, + "forecast_time", fc.ForecastTime) + + return nil +} + +func runPublisher(ctx context.Context, cfg *PublisherConfig, logger *slog.Logger) error { + // Load initial state + lastObs := loadWatermark(cfg.WatermarkFile) + lastFcRun := time.Time{} + + // Create MQTT client + mqttCfg := weather.MQTTConfig{ + Broker: cfg.MQTTBroker, + ClientID: cfg.MQTTClientID, + Username: cfg.MQTTUsername, + Password: cfg.MQTTPassword, + KeepAlive: cfg.MQTTKeepAlive, + SessionExpiry: cfg.MQTTSessionExp, + Timeout: cfg.HTTPTimeout, + } + + mqttClient, err := weather.CreateMQTTClient(ctx, mqttCfg, func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + logger.Info("MQTT v5 connected", + "session_present", connAck.SessionPresent, + "keep_alive", cfg.MQTTKeepAlive) + }) + if err != nil { + return fmt.Errorf("failed to connect to MQTT: %w", err) + } + defer func() { + discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := mqttClient.Disconnect(discCtx); err != nil { + logger.Error("Failed to disconnect MQTT", "error", err) + } + }() + + // Create timers + obsTimer := time.NewTimer(0) // Start immediately + fcTimer := time.NewTimer(0) // Start immediately + + defer obsTimer.Stop() + defer fcTimer.Stop() + + logger.Info("Poller started", + "obs_interval", cfg.ObsPollEvery, + "fc_interval", cfg.FcPollEvery, + "mqtt_broker", cfg.MQTTBroker) + + for { + select { + case <-ctx.Done(): + logger.Info("Poller stopping") + return nil + + case <-obsTimer.C: + // Poll observations + obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) + if obsEnd.After(lastObs) { + logger.Info("Polling observations", "from", lastObs, "to", obsEnd) + + url := weather.BuildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) + data, err := weather.FetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch observations", "error", err) + } else { + obs, maxT, err := weather.ParseObservations(data, cfg.StationID, lastObs) + + if err != nil { + logger.Error("Failed to parse observations", "error", err) + } else if len(obs) > 0 { + // Publish observations + successCount := 0 + for _, o := range obs { + if err := publishObservation(mqttClient, o, logger); err != nil { + logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) + } else { + successCount++ + } + } + + // Update watermark + lastObs = maxT + if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { + logger.Error("Failed to store watermark", "error", err) + } + + logger.Info("Published observations", + "successful", successCount, + "total", len(obs), + "new_watermark", lastObs) + } else { + logger.Debug("No new observations found") + } + } + } + obsTimer.Reset(cfg.ObsPollEvery) + + case <-fcTimer.C: + // Poll forecast + now := time.Now().UTC() + start := now + end := now.Add(48 * time.Hour) + + logger.Info("Polling forecast", "from", start, "to", end) + + url := weather.BuildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) + data, err := weather.FetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch forecast", "error", err) + } else { + fc, runTime, err := weather.ParseForecast(data, cfg.HelLat, cfg.HelLon) + + if err != nil { + logger.Error("Failed to parse forecast", "error", err) + } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { + // Publish forecast + successCount := 0 + for _, f := range fc { + if err := publishForecast(mqttClient, f, logger); err != nil { + logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) + } else { + successCount++ + } + } + + lastFcRun = runTime + logger.Info("Published forecast", + "successful", successCount, + "total", len(fc), + "run_time", runTime) + } else { + logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) + } + } + fcTimer.Reset(cfg.FcPollEvery) + } + } +} + +func main() { + cfg := loadPublisherConfig() + + // Setup logging + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.LogLevel), + })) + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Info("Received signal", "signal", sig) + cancel() + }() + + // Run the publisher + logger.Info("Starting weather data publisher with MQTT v5") + if err := runPublisher(ctx, cfg, logger); err != nil { + logger.Error("Publisher failed", "error", err) + os.Exit(1) + } + + logger.Info("Publisher stopped gracefully") +} + +func parseLogLevel(level string) slog.Level { + switch strings.ToLower(level) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} |
