aboutsummaryrefslogtreecommitdiffstats
path: root/cmd/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/publisher')
-rw-r--r--cmd/publisher/main.go417
1 files changed, 417 insertions, 0 deletions
diff --git a/cmd/publisher/main.go b/cmd/publisher/main.go
new file mode 100644
index 0000000..ab84616
--- /dev/null
+++ b/cmd/publisher/main.go
@@ -0,0 +1,417 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/joho/godotenv"
+ "hub/internal/weather"
+)
+
+type PublisherConfig struct {
+ // FMI API
+ FMIEndpoint string `env:"FMI_ENDPOINT"`
+
+ // Observation
+ StationID int `env:"STATION_ID"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
+ ObsTimestep int `env:"OBS_TIMESTEP"`
+ WatermarkFile string `env:"WATERMARK_FILE"`
+
+ // Forecast
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
+ HelLat float64 `env:"HEL_LAT"`
+ HelLon float64 `env:"HEL_LON"`
+
+ // MQTT
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
+}
+
+func loadPublisherConfig() *PublisherConfig {
+ _ = godotenv.Load()
+
+ return &PublisherConfig{
+ FMIEndpoint: weather.GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: weather.ParseInt(weather.GetEnv("STATION_ID", "100968")),
+ ObsPollEvery: weather.ParseDuration(weather.GetEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: weather.ParseDuration(weather.GetEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: weather.ParseInt(weather.GetEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: weather.GetEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: weather.ParseDuration(weather.GetEnv("FC_POLL_EVERY", "1h")),
+ HelLat: weather.ParseFloat(weather.GetEnv("HEL_LAT", "60.1699")),
+ HelLon: weather.ParseFloat(weather.GetEnv("HEL_LON", "24.9384")),
+ MQTTBroker: weather.GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: weather.GetEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: weather.GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: weather.GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: weather.ParseDuration(weather.GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: weather.ParseDuration(weather.GetEnv("MQTT_SESSION_EXP", "1h")),
+ LogLevel: weather.GetEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: weather.ParseDuration(weather.GetEnv("HTTP_TIMEOUT", "30s")),
+ }
+}
+
+func loadWatermark(filename string) time.Time {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ return time.Unix(sec, 0).UTC()
+}
+
+func storeWatermark(filename string, t time.Time) error {
+ data := []byte(fmt.Sprintf("%d", t.Unix()))
+ return os.WriteFile(filename, data, 0644)
+}
+
+func publishObservation(cm *autopaho.ConnectionManager, obs weather.Observation, logger *slog.Logger) error {
+ topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
+
+ // Safe JSON structure
+ type SafeObservation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value *weather.JSONFloat64 `json:"value,omitempty"`
+ }
+
+ var safeValue *weather.JSONFloat64
+ if obs.Value != nil {
+ v := weather.JSONFloat64(*obs.Value)
+ safeValue = &v
+ }
+
+ safeObs := SafeObservation{
+ Station: obs.Station,
+ Parameter: obs.Parameter,
+ Time: obs.Time,
+ Value: safeValue,
+ }
+
+ b, err := json.Marshal(safeObs)
+ if err != nil {
+ return err
+ }
+
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)},
+ {Key: "parameter", Value: obs.Parameter},
+ {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)},
+ {Key: "data_type", Value: "observation"},
+ {Key: "source", Value: "fmi"},
+ },
+ }
+
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: false,
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish observation: %w", err)
+ }
+
+ logger.Debug("Published observation",
+ "topic", topic,
+ "station", obs.Station,
+ "parameter", obs.Parameter,
+ "value", obs.Value,
+ "time", obs.Time)
+
+ return nil
+}
+
+func publishForecast(cm *autopaho.ConnectionManager, fc weather.ForecastValue, logger *slog.Logger) error {
+ topic := fmt.Sprintf(
+ "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
+ fc.RunTime.Format(time.RFC3339),
+ fc.Parameter,
+ )
+
+ // Safe JSON structure
+ type SafeForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value *weather.JSONFloat64 `json:"value,omitempty"`
+ }
+
+ var safeValue *weather.JSONFloat64
+ if fc.Value != nil {
+ v := weather.JSONFloat64(*fc.Value)
+ safeValue = &v
+ }
+
+ safeFc := SafeForecastValue{
+ Location: fc.Location,
+ Model: fc.Model,
+ RunTime: fc.RunTime,
+ ForecastTime: fc.ForecastTime,
+ Parameter: fc.Parameter,
+ Value: safeValue,
+ }
+
+ b, err := json.Marshal(safeFc)
+ if err != nil {
+ return err
+ }
+
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "model", Value: fc.Model},
+ {Key: "parameter", Value: fc.Parameter},
+ {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)},
+ {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)},
+ {Key: "data_type", Value: "forecast"},
+ {Key: "source", Value: "fmi"},
+ {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)},
+ {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)},
+ },
+ }
+
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: true, // Forecasts are retained since they don't change
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish forecast: %w", err)
+ }
+
+ logger.Debug("Published forecast",
+ "topic", topic,
+ "parameter", fc.Parameter,
+ "run_time", fc.RunTime,
+ "forecast_time", fc.ForecastTime)
+
+ return nil
+}
+
+func runPublisher(ctx context.Context, cfg *PublisherConfig, logger *slog.Logger) error {
+ // Load initial state
+ lastObs := loadWatermark(cfg.WatermarkFile)
+ lastFcRun := time.Time{}
+
+ // Create MQTT client
+ mqttCfg := weather.MQTTConfig{
+ Broker: cfg.MQTTBroker,
+ ClientID: cfg.MQTTClientID,
+ Username: cfg.MQTTUsername,
+ Password: cfg.MQTTPassword,
+ KeepAlive: cfg.MQTTKeepAlive,
+ SessionExpiry: cfg.MQTTSessionExp,
+ Timeout: cfg.HTTPTimeout,
+ }
+
+ mqttClient, err := weather.CreateMQTTClient(ctx, mqttCfg, func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ logger.Info("MQTT v5 connected",
+ "session_present", connAck.SessionPresent,
+ "keep_alive", cfg.MQTTKeepAlive)
+ })
+ if err != nil {
+ return fmt.Errorf("failed to connect to MQTT: %w", err)
+ }
+ defer func() {
+ discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := mqttClient.Disconnect(discCtx); err != nil {
+ logger.Error("Failed to disconnect MQTT", "error", err)
+ }
+ }()
+
+ // Create timers
+ obsTimer := time.NewTimer(0) // Start immediately
+ fcTimer := time.NewTimer(0) // Start immediately
+
+ defer obsTimer.Stop()
+ defer fcTimer.Stop()
+
+ logger.Info("Poller started",
+ "obs_interval", cfg.ObsPollEvery,
+ "fc_interval", cfg.FcPollEvery,
+ "mqtt_broker", cfg.MQTTBroker)
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Info("Poller stopping")
+ return nil
+
+ case <-obsTimer.C:
+ // Poll observations
+ obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag)
+ if obsEnd.After(lastObs) {
+ logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
+
+ url := weather.BuildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
+ data, err := weather.FetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch observations", "error", err)
+ } else {
+ obs, maxT, err := weather.ParseObservations(data, cfg.StationID, lastObs)
+
+ if err != nil {
+ logger.Error("Failed to parse observations", "error", err)
+ } else if len(obs) > 0 {
+ // Publish observations
+ successCount := 0
+ for _, o := range obs {
+ if err := publishObservation(mqttClient, o, logger); err != nil {
+ logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter)
+ } else {
+ successCount++
+ }
+ }
+
+ // Update watermark
+ lastObs = maxT
+ if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil {
+ logger.Error("Failed to store watermark", "error", err)
+ }
+
+ logger.Info("Published observations",
+ "successful", successCount,
+ "total", len(obs),
+ "new_watermark", lastObs)
+ } else {
+ logger.Debug("No new observations found")
+ }
+ }
+ }
+ obsTimer.Reset(cfg.ObsPollEvery)
+
+ case <-fcTimer.C:
+ // Poll forecast
+ now := time.Now().UTC()
+ start := now
+ end := now.Add(48 * time.Hour)
+
+ logger.Info("Polling forecast", "from", start, "to", end)
+
+ url := weather.BuildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
+ data, err := weather.FetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch forecast", "error", err)
+ } else {
+ fc, runTime, err := weather.ParseForecast(data, cfg.HelLat, cfg.HelLon)
+
+ if err != nil {
+ logger.Error("Failed to parse forecast", "error", err)
+ } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
+ // Publish forecast
+ successCount := 0
+ for _, f := range fc {
+ if err := publishForecast(mqttClient, f, logger); err != nil {
+ logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter)
+ } else {
+ successCount++
+ }
+ }
+
+ lastFcRun = runTime
+ logger.Info("Published forecast",
+ "successful", successCount,
+ "total", len(fc),
+ "run_time", runTime)
+ } else {
+ logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun)
+ }
+ }
+ fcTimer.Reset(cfg.FcPollEvery)
+ }
+ }
+}
+
+func main() {
+ cfg := loadPublisherConfig()
+
+ // Setup logging
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: parseLogLevel(cfg.LogLevel),
+ }))
+
+ // Create context for graceful shutdown
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Setup signal handling
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ sig := <-sigChan
+ logger.Info("Received signal", "signal", sig)
+ cancel()
+ }()
+
+ // Run the publisher
+ logger.Info("Starting weather data publisher with MQTT v5")
+ if err := runPublisher(ctx, cfg, logger); err != nil {
+ logger.Error("Publisher failed", "error", err)
+ os.Exit(1)
+ }
+
+ logger.Info("Publisher stopped gracefully")
+}
+
+func parseLogLevel(level string) slog.Level {
+ switch strings.ToLower(level) {
+ case "debug":
+ return slog.LevelDebug
+ case "warn", "warning":
+ return slog.LevelWarn
+ case "error":
+ return slog.LevelError
+ default:
+ return slog.LevelInfo
+ }
+}