diff options
| -rw-r--r-- | README.md | 30 | ||||
| -rw-r--r-- | cmd/publisher/main.go | 417 | ||||
| -rw-r--r-- | cmd/subscriber/main.go | 310 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | internal/weather/config.go | 102 | ||||
| -rw-r--r-- | internal/weather/fmi.go | 258 | ||||
| -rw-r--r-- | internal/weather/mqtt.go | 76 | ||||
| -rw-r--r-- | internal/weather/types.go | 44 | ||||
| -rw-r--r-- | internal/weather/utils.go | 82 | ||||
| -rw-r--r-- | main.go | 792 | ||||
| -rw-r--r-- | pub.service | 4 |
11 files changed, 1322 insertions, 795 deletions
@@ -3,3 +3,33 @@ ## Downloading source data curl -G "https://opendata.fmi.fi/wfs" --data-urlencode "service=WFS" --data-urlencode "version=2.0.0" --data-urlencode "request=GetFeature" --data-urlencode "storedquery_id=fmi::observations::weather::timevaluepair" --data-urlencode "fmisid=100968" --data-urlencode "starttime=2025-12-28T00:00:00Z" --data-urlencode "endtime=2025-12-28T01:00:00Z" --data-urlencode "timestep=60" -o observation.xml + +### Instructions + +Build + +```bash +go build -o bin/publisher ./cmd/publisher +go build -o bin/subscriber ./cmd/subscriber +``` + +Run + +```bash +go run ./cmd/publisher +go run ./cmd/subscriber +``` + +Install + +```bash +go install ./cmd/publisher +go install ./cmd/subscriber +``` + +Then run from anywhere + +```bash +publisher +subscriber +``` diff --git a/cmd/publisher/main.go b/cmd/publisher/main.go new file mode 100644 index 0000000..ab84616 --- /dev/null +++ b/cmd/publisher/main.go @@ -0,0 +1,417 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/joho/godotenv" + "hub/internal/weather" +) + +type PublisherConfig struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT"` + + // Observation + StationID int `env:"STATION_ID"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` + ObsTimestep int `env:"OBS_TIMESTEP"` + WatermarkFile string `env:"WATERMARK_FILE"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY"` + HelLat float64 `env:"HEL_LAT"` + HelLon float64 `env:"HEL_LON"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` +} + +func loadPublisherConfig() *PublisherConfig { + _ = godotenv.Load() + + return &PublisherConfig{ + FMIEndpoint: weather.GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: weather.ParseInt(weather.GetEnv("STATION_ID", "100968")), + ObsPollEvery: weather.ParseDuration(weather.GetEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: weather.ParseDuration(weather.GetEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: weather.ParseInt(weather.GetEnv("OBS_TIMESTEP", "60")), + WatermarkFile: weather.GetEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: weather.ParseDuration(weather.GetEnv("FC_POLL_EVERY", "1h")), + HelLat: weather.ParseFloat(weather.GetEnv("HEL_LAT", "60.1699")), + HelLon: weather.ParseFloat(weather.GetEnv("HEL_LON", "24.9384")), + MQTTBroker: weather.GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: weather.GetEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: weather.GetEnv("MQTT_USERNAME", ""), + MQTTPassword: weather.GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: weather.ParseDuration(weather.GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: weather.ParseDuration(weather.GetEnv("MQTT_SESSION_EXP", "1h")), + LogLevel: weather.GetEnv("LOG_LEVEL", "info"), + HTTPTimeout: weather.ParseDuration(weather.GetEnv("HTTP_TIMEOUT", "30s")), + } +} + +func loadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +func storeWatermark(filename string, t time.Time) error { + data := []byte(fmt.Sprintf("%d", t.Unix())) + return os.WriteFile(filename, data, 0644) +} + +func publishObservation(cm *autopaho.ConnectionManager, obs weather.Observation, logger *slog.Logger) error { + topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) + + // Safe JSON structure + type SafeObservation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value *weather.JSONFloat64 `json:"value,omitempty"` + } + + var safeValue *weather.JSONFloat64 + if obs.Value != nil { + v := weather.JSONFloat64(*obs.Value) + safeValue = &v + } + + safeObs := SafeObservation{ + Station: obs.Station, + Parameter: obs.Parameter, + Time: obs.Time, + Value: safeValue, + } + + b, err := json.Marshal(safeObs) + if err != nil { + return err + } + + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, + {Key: "parameter", Value: obs.Parameter}, + {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, + {Key: "data_type", Value: "observation"}, + {Key: "source", Value: "fmi"}, + }, + } + + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: false, + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish observation: %w", err) + } + + logger.Debug("Published observation", + "topic", topic, + "station", obs.Station, + "parameter", obs.Parameter, + "value", obs.Value, + "time", obs.Time) + + return nil +} + +func publishForecast(cm *autopaho.ConnectionManager, fc weather.ForecastValue, logger *slog.Logger) error { + topic := fmt.Sprintf( + "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", + fc.RunTime.Format(time.RFC3339), + fc.Parameter, + ) + + // Safe JSON structure + type SafeForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value *weather.JSONFloat64 `json:"value,omitempty"` + } + + var safeValue *weather.JSONFloat64 + if fc.Value != nil { + v := weather.JSONFloat64(*fc.Value) + safeValue = &v + } + + safeFc := SafeForecastValue{ + Location: fc.Location, + Model: fc.Model, + RunTime: fc.RunTime, + ForecastTime: fc.ForecastTime, + Parameter: fc.Parameter, + Value: safeValue, + } + + b, err := json.Marshal(safeFc) + if err != nil { + return err + } + + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "model", Value: fc.Model}, + {Key: "parameter", Value: fc.Parameter}, + {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, + {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, + {Key: "data_type", Value: "forecast"}, + {Key: "source", Value: "fmi"}, + {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, + {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, + }, + } + + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: true, // Forecasts are retained since they don't change + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish forecast: %w", err) + } + + logger.Debug("Published forecast", + "topic", topic, + "parameter", fc.Parameter, + "run_time", fc.RunTime, + "forecast_time", fc.ForecastTime) + + return nil +} + +func runPublisher(ctx context.Context, cfg *PublisherConfig, logger *slog.Logger) error { + // Load initial state + lastObs := loadWatermark(cfg.WatermarkFile) + lastFcRun := time.Time{} + + // Create MQTT client + mqttCfg := weather.MQTTConfig{ + Broker: cfg.MQTTBroker, + ClientID: cfg.MQTTClientID, + Username: cfg.MQTTUsername, + Password: cfg.MQTTPassword, + KeepAlive: cfg.MQTTKeepAlive, + SessionExpiry: cfg.MQTTSessionExp, + Timeout: cfg.HTTPTimeout, + } + + mqttClient, err := weather.CreateMQTTClient(ctx, mqttCfg, func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + logger.Info("MQTT v5 connected", + "session_present", connAck.SessionPresent, + "keep_alive", cfg.MQTTKeepAlive) + }) + if err != nil { + return fmt.Errorf("failed to connect to MQTT: %w", err) + } + defer func() { + discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := mqttClient.Disconnect(discCtx); err != nil { + logger.Error("Failed to disconnect MQTT", "error", err) + } + }() + + // Create timers + obsTimer := time.NewTimer(0) // Start immediately + fcTimer := time.NewTimer(0) // Start immediately + + defer obsTimer.Stop() + defer fcTimer.Stop() + + logger.Info("Poller started", + "obs_interval", cfg.ObsPollEvery, + "fc_interval", cfg.FcPollEvery, + "mqtt_broker", cfg.MQTTBroker) + + for { + select { + case <-ctx.Done(): + logger.Info("Poller stopping") + return nil + + case <-obsTimer.C: + // Poll observations + obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) + if obsEnd.After(lastObs) { + logger.Info("Polling observations", "from", lastObs, "to", obsEnd) + + url := weather.BuildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) + data, err := weather.FetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch observations", "error", err) + } else { + obs, maxT, err := weather.ParseObservations(data, cfg.StationID, lastObs) + + if err != nil { + logger.Error("Failed to parse observations", "error", err) + } else if len(obs) > 0 { + // Publish observations + successCount := 0 + for _, o := range obs { + if err := publishObservation(mqttClient, o, logger); err != nil { + logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) + } else { + successCount++ + } + } + + // Update watermark + lastObs = maxT + if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { + logger.Error("Failed to store watermark", "error", err) + } + + logger.Info("Published observations", + "successful", successCount, + "total", len(obs), + "new_watermark", lastObs) + } else { + logger.Debug("No new observations found") + } + } + } + obsTimer.Reset(cfg.ObsPollEvery) + + case <-fcTimer.C: + // Poll forecast + now := time.Now().UTC() + start := now + end := now.Add(48 * time.Hour) + + logger.Info("Polling forecast", "from", start, "to", end) + + url := weather.BuildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) + data, err := weather.FetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch forecast", "error", err) + } else { + fc, runTime, err := weather.ParseForecast(data, cfg.HelLat, cfg.HelLon) + + if err != nil { + logger.Error("Failed to parse forecast", "error", err) + } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { + // Publish forecast + successCount := 0 + for _, f := range fc { + if err := publishForecast(mqttClient, f, logger); err != nil { + logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) + } else { + successCount++ + } + } + + lastFcRun = runTime + logger.Info("Published forecast", + "successful", successCount, + "total", len(fc), + "run_time", runTime) + } else { + logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) + } + } + fcTimer.Reset(cfg.FcPollEvery) + } + } +} + +func main() { + cfg := loadPublisherConfig() + + // Setup logging + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.LogLevel), + })) + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Info("Received signal", "signal", sig) + cancel() + }() + + // Run the publisher + logger.Info("Starting weather data publisher with MQTT v5") + if err := runPublisher(ctx, cfg, logger); err != nil { + logger.Error("Publisher failed", "error", err) + os.Exit(1) + } + + logger.Info("Publisher stopped gracefully") +} + +func parseLogLevel(level string) slog.Level { + switch strings.ToLower(level) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/cmd/subscriber/main.go b/cmd/subscriber/main.go new file mode 100644 index 0000000..f561d60 --- /dev/null +++ b/cmd/subscriber/main.go @@ -0,0 +1,310 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/url" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/joho/godotenv" + "hub/internal/weather" +) + +// MessageHandler wraps the function to process incoming messages +type MessageHandler func(*paho.Publish) + +func main() { + // Load environment variables + _ = godotenv.Load() + + // Load configuration + cfg := weather.LoadSubscriberConfig() + + // Setup logging + var logLevel slog.Level + switch strings.ToLower(cfg.LogLevel) { + case "debug": + logLevel = slog.LevelDebug + case "warn", "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + default: + logLevel = slog.LevelInfo + } + + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })) + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Info("Received signal", "signal", sig) + cancel() + }() + + // Run the subscriber + logger.Info("Starting MQTT v5 subscriber for FMI weather data", + "broker", cfg.MQTTBroker, + "topics", cfg.Topics, + "qos", cfg.QoS) + + if err := runSubscriber(ctx, cfg, logger); err != nil { + logger.Error("Subscriber failed", "error", err) + os.Exit(1) + } + + logger.Info("Subscriber stopped gracefully") +} + +func runSubscriber(ctx context.Context, cfg *weather.SubscriberConfig, logger *slog.Logger) error { + // Initialize statistics + stats := weather.InitTopicStats() + statsMutex := &sync.RWMutex{} + + // Create MQTT client with custom configuration + serverURL, err := url.Parse(cfg.MQTTBroker) + if err != nil { + return fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + // Create message handler + messageHandler := func(pr paho.PublishReceived) (bool, error) { + go handleMessage(pr.Packet, logger, stats, statsMutex) + return true, nil + } + + // Client configuration + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), + OnPublishReceived: []func(paho.PublishReceived) (bool, error){ + messageHandler, + }, + OnServerDisconnect: func(d *paho.Disconnect) { + if d != nil && d.ReasonCode != 0 { + logger.Warn("MQTT disconnected", "reason", d.ReasonCode) + } else { + logger.Info("MQTT disconnected gracefully") + } + }, + }, + ConnectUsername: cfg.MQTTUsername, + ConnectPassword: []byte(cfg.MQTTPassword), + ConnectTimeout: 30 * time.Second, + } + + cliCfg.CleanStartOnInitialConnection = false + cliCfg.SessionExpiryInterval = uint32(cfg.MQTTSessionExp.Seconds()) + + // Connection callbacks + cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + logger.Info("MQTT v5 connected", + "session_present", connAck.SessionPresent, + "keep_alive", cfg.MQTTKeepAlive) + + // Subscribe to topics after connection is established + if len(cfg.Topics) > 0 { + subscriptions := make([]paho.SubscribeOptions, len(cfg.Topics)) + for i, topic := range cfg.Topics { + subscriptions[i] = paho.SubscribeOptions{ + Topic: topic, + QoS: byte(cfg.QoS), + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // FIXED: SubscriptionIdentifier is a pointer + subID := int(1) + subProps := &paho.SubscribeProperties{ + SubscriptionIdentifier: &subID, + } + + if _, err := cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: subscriptions, + Properties: subProps, + }); err != nil { + logger.Error("Failed to subscribe", "error", err) + } else { + logger.Info("Subscribed to topics", + "topics", cfg.Topics, + "qos", cfg.QoS) + } + } + } + + cliCfg.OnConnectError = func(err error) { + logger.Error("MQTT connection error", "error", err) + } + + // Connect to broker + mqttClient, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return fmt.Errorf("failed to create MQTT connection: %w", err) + } + + // Wait for connection + err = mqttClient.AwaitConnection(ctx) + if err != nil { + return fmt.Errorf("failed to establish MQTT connection: %w", err) + } + + defer func() { + discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := mqttClient.Disconnect(discCtx); err != nil { + logger.Error("Failed to disconnect MQTT", "error", err) + } + }() + + logger.Info("Subscriber started and waiting for messages...") + + // Statistics reporting goroutine + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Print final statistics + statsMutex.RLock() + finalStats := stats.MessagesReceived + statsMutex.RUnlock() + + if len(finalStats) > 0 { + logger.Info("Final message statistics", "stats", finalStats) + } + return nil + + case <-ticker.C: + if cfg.EnableDebug { + statsMutex.RLock() + currentStats := stats.MessagesReceived + statsMutex.RUnlock() + + if len(currentStats) > 0 { + logger.Debug("Message statistics", "stats", currentStats) + } + } + } + } +} + +func handleMessage(pkt *paho.Publish, logger *slog.Logger, + stats *weather.TopicStats, mutex *sync.RWMutex) { + + // Update statistics + mutex.Lock() + stats.MessagesReceived[pkt.Topic]++ + stats.LastMessageTime[pkt.Topic] = time.Now() + mutex.Unlock() + + // Route based on topic pattern + switch { + case strings.HasPrefix(pkt.Topic, "weather/obs/"): + processObservationMessage(pkt, logger) + case strings.HasPrefix(pkt.Topic, "weather/forecast/"): + processForecastMessage(pkt, logger) + default: + logger.Warn("Unknown topic pattern", "topic", pkt.Topic) + // Try to decode as generic JSON + var data map[string]interface{} + if err := json.Unmarshal(pkt.Payload, &data); err == nil { + logger.Info("Generic message", "topic", pkt.Topic, "data", data) + } else { + logger.Info("Raw message", "topic", pkt.Topic, "payload", string(pkt.Payload)) + } + } +} + +func processObservationMessage(pkt *paho.Publish, logger *slog.Logger) { + var obs weather.Observation + if err := json.Unmarshal(pkt.Payload, &obs); err != nil { + logger.Error("Failed to parse observation", + "error", err, + "topic", pkt.Topic) + return + } + + // Extract user properties + userProps := make(map[string]string) + if pkt.Properties != nil { + for _, prop := range pkt.Properties.User { + userProps[prop.Key] = prop.Value + } + } + + // Format value for display + valueStr := "null" + if obs.Value != nil { + valueStr = fmt.Sprintf("%.2f", *obs.Value) + } + + logger.Info("Observation received", + "topic", pkt.Topic, + "station", obs.Station, + "parameter", obs.Parameter, + "time", obs.Time.Format(time.RFC3339), + "value", valueStr, + "user_properties", userProps, + "retain", pkt.Retain, + "qos", pkt.QoS) +} + +func processForecastMessage(pkt *paho.Publish, logger *slog.Logger) { + var fc weather.ForecastValue + if err := json.Unmarshal(pkt.Payload, &fc); err != nil { + logger.Error("Failed to parse forecast", + "error", err, + "topic", pkt.Topic) + return + } + + // Extract user properties + userProps := make(map[string]string) + if pkt.Properties != nil { + for _, prop := range pkt.Properties.User { + userProps[prop.Key] = prop.Value + } + } + + // Format value for display + valueStr := "null" + if fc.Value != nil { + valueStr = fmt.Sprintf("%.2f", *fc.Value) + } + + logger.Info("Forecast received", + "topic", pkt.Topic, + "model", fc.Model, + "parameter", fc.Parameter, + "run_time", fc.RunTime.Format(time.RFC3339), + "forecast_time", fc.ForecastTime.Format(time.RFC3339), + "location", fmt.Sprintf("(%.4f,%.4f)", fc.Location.Lat, fc.Location.Lon), + "value", valueStr, + "user_properties", userProps, + "retain", pkt.Retain, + "qos", pkt.QoS) +} @@ -1,4 +1,4 @@ -module pub +module hub go 1.25.5 diff --git a/internal/weather/config.go b/internal/weather/config.go new file mode 100644 index 0000000..6450da4 --- /dev/null +++ b/internal/weather/config.go @@ -0,0 +1,102 @@ +package weather + +import ( + "time" +) + +// PublisherConfig configuration for the publisher +type PublisherConfig struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT"` + + // Observation + StationID int `env:"STATION_ID"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` + ObsTimestep int `env:"OBS_TIMESTEP"` + WatermarkFile string `env:"WATERMARK_FILE"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY"` + HelLat float64 `env:"HEL_LAT"` + HelLon float64 `env:"HEL_LON"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` +} + +// SubscriberConfig configuration for the subscriber +type SubscriberConfig struct { + // MQTT Configuration + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Subscription Configuration + Topics []string `env:"SUBSCRIBE_TOPICS"` + QoS int `env:"SUBSCRIBE_QOS"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + EnableDebug bool `env:"ENABLE_DEBUG"` +} + +// LoadPublisherConfig loads publisher configuration from environment +func LoadPublisherConfig() *PublisherConfig { + cfg := &PublisherConfig{ + FMIEndpoint: GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: ParseInt(GetEnv("STATION_ID", "100968")), + ObsPollEvery: ParseDuration(GetEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: ParseDuration(GetEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: ParseInt(GetEnv("OBS_TIMESTEP", "60")), + WatermarkFile: GetEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: ParseDuration(GetEnv("FC_POLL_EVERY", "1h")), + HelLat: ParseFloat(GetEnv("HEL_LAT", "60.1699")), + HelLon: ParseFloat(GetEnv("HEL_LON", "24.9384")), + MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: GetEnv("MQTT_USERNAME", ""), + MQTTPassword: GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")), + LogLevel: GetEnv("LOG_LEVEL", "info"), + HTTPTimeout: ParseDuration(GetEnv("HTTP_TIMEOUT", "30s")), + } + + return cfg +} + +// LoadSubscriberConfig loads subscriber configuration from environment +func LoadSubscriberConfig() *SubscriberConfig { + qos := ParseInt(GetEnv("SUBSCRIBE_QOS", "1")) + if qos < 0 || qos > 2 { + qos = 1 + } + + cfg := &SubscriberConfig{ + MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-subscriber"), + MQTTUsername: GetEnv("MQTT_USERNAME", ""), + MQTTPassword: GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")), + Topics: ParseTopics(GetEnv("SUBSCRIBE_TOPICS", "")), + QoS: qos, + LogLevel: GetEnv("LOG_LEVEL", "info"), + EnableDebug: GetEnv("ENABLE_DEBUG", "false") == "true", + } + + return cfg +} diff --git a/internal/weather/fmi.go b/internal/weather/fmi.go new file mode 100644 index 0000000..52c9f63 --- /dev/null +++ b/internal/weather/fmi.go @@ -0,0 +1,258 @@ +package weather + +import ( + "bytes" + "context" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +// BuildObservationsURL constructs the URL for fetching observations from FMI. +func BuildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") + q.Set("fmisid", fmt.Sprintf("%d", stationID)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + q.Set("timestep", fmt.Sprintf("%d", timestep)) + + return endpoint + "?" + q.Encode() +} + +// BuildForecastURL constructs the URL for fetching forecasts from FMI. +func BuildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + + return endpoint + "?" + q.Encode() +} + +// FetchData fetches data from the given URL with a timeout. +func FetchData(url string, timeout time.Duration) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/xml") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + return io.ReadAll(resp.Body) +} + +// ParseObservations parses the XML response from FMI observations query. +func ParseObservations(data []byte, stationID int, minTime time.Time) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + ts time.Time + maxObserved = minTime + out []Observation + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, maxObserved, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + ts = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + ts = parsedTime + } + } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + out = append(out, Observation{ + Station: stationID, + Parameter: param, + Time: ts, + Value: valuePtr, + }) + + if ts.After(maxObserved) { + maxObserved = ts + } + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, maxObserved, nil +} + +// ParseForecast parses the XML response from FMI forecast query. +func ParseForecast(data []byte, lat, lon float64) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, runTime, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "resultTime": + var s string + if err := dec.DecodeElement(&s, &el); err == nil { + if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { + runTime = parsedTime + } + } + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + fcTime = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + fcTime = parsedTime + } + } else if inValue && inTVP && !fcTime.IsZero() { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + fc := ForecastValue{ + Location: struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + }{lat, lon}, + Model: "harmonie", + RunTime: runTime, + ForecastTime: fcTime, + Parameter: param, + Value: valuePtr, + } + out = append(out, fc) + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, runTime, nil +} diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go new file mode 100644 index 0000000..211b441 --- /dev/null +++ b/internal/weather/mqtt.go @@ -0,0 +1,76 @@ +package weather + +import ( + "context" + "crypto/tls" + "fmt" + "net/url" + "os" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" +) + +// MQTTConfig holds MQTT connection configuration +type MQTTConfig struct { + Broker string + ClientID string + Username string + Password string + KeepAlive time.Duration + SessionExpiry time.Duration + Timeout time.Duration + OnPublishReceived []func(paho.PublishReceived) (bool, error) +} + +// CreateMQTTClient creates a MQTT client with the given configuration +func CreateMQTTClient(ctx context.Context, cfg MQTTConfig, + onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) { + + serverURL, err := url.Parse(cfg.Broker) + if err != nil { + return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.KeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()), + OnPublishReceived: cfg.OnPublishReceived, + OnServerDisconnect: func(d *paho.Disconnect) { + + // Default disconnect handler + }, + }, + ConnectUsername: cfg.Username, + ConnectPassword: []byte(cfg.Password), + ConnectTimeout: cfg.Timeout, + } + + cliCfg.CleanStartOnInitialConnection = false + cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds()) + + if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { + cliCfg.TlsCfg = &tls.Config{ + MinVersion: tls.VersionTLS12, + } + } + + if onConnect != nil { + cliCfg.OnConnectionUp = onConnect + } + + cm, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return nil, fmt.Errorf("failed to create MQTT connection: %w", err) + } + + err = cm.AwaitConnection(ctx) + if err != nil { + return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) + } + + return cm, nil +} diff --git a/internal/weather/types.go b/internal/weather/types.go new file mode 100644 index 0000000..5563bfa --- /dev/null +++ b/internal/weather/types.go @@ -0,0 +1,44 @@ +package weather + +import ( + "encoding/json" + "math" + "time" +) + +// Observation represents a weather observation from a station +type Observation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value *float64 `json:"value,omitempty"` +} + +// ForecastValue represents a weather forecast value +type ForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value *float64 `json:"value,omitempty"` +} + +// JSONFloat64 handles NaN values properly in JSON +type JSONFloat64 float64 + +func (f JSONFloat64) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + return []byte("null"), nil + } + return json.Marshal(float64(f)) +} + +// TopicStats tracks message statistics for subscribers +type TopicStats struct { + MessagesReceived map[string]int + LastMessageTime map[string]time.Time +} diff --git a/internal/weather/utils.go b/internal/weather/utils.go new file mode 100644 index 0000000..b59949a --- /dev/null +++ b/internal/weather/utils.go @@ -0,0 +1,82 @@ +// internal/weather/utils.go +package weather + +import ( + "os" + "strconv" + "strings" + "time" +) + +// GetEnv gets environment variable with default +func GetEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +// ParseDuration parses duration string +func ParseDuration(s string) time.Duration { + d, err := time.ParseDuration(s) + if err != nil { + return 5 * time.Minute + } + return d +} + +// ParseInt parses integer string +func ParseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + return 0 + } + return i +} + +// ParseFloat parses float string +func ParseFloat(s string) float64 { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0 + } + return f +} + +// ParseTopics parses comma-separated topics +func ParseTopics(s string) []string { + if s == "" { + return []string{ + "weather/obs/fmi/#", + "weather/forecast/fmi/#", + } + } + return strings.Split(s, ",") +} + +// LoadWatermark loads last observation time from file +func LoadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +// StoreWatermark stores observation time to file +func StoreWatermark(filename string, t time.Time) error { + data := []byte(strconv.FormatInt(t.Unix(), 10)) + return os.WriteFile(filename, data, 0644) +} + +// InitTopicStats initializes topic statistics +func InitTopicStats() *TopicStats { + return &TopicStats{ + MessagesReceived: make(map[string]int), + LastMessageTime: make(map[string]time.Time), + } +} diff --git a/main.go b/main.go deleted file mode 100644 index c512022..0000000 --- a/main.go +++ /dev/null @@ -1,792 +0,0 @@ -package main - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "encoding/xml" - "fmt" - "io" - "log/slog" - "math" - "net/http" - "net/url" - "os" - "strconv" - "strings" - "time" - - "github.com/eclipse/paho.golang/autopaho" - "github.com/eclipse/paho.golang/paho" - "github.com/joho/godotenv" -) - -type Config struct { - // FMI API - FMIEndpoint string `env:"FMI_ENDPOINT"` - - // Observation - StationID int `env:"STATION_ID"` - ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` - ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` - ObsTimestep int `env:"OBS_TIMESTEP"` - WatermarkFile string `env:"WATERMARK_FILE"` - - // Forecast - FcPollEvery time.Duration `env:"FC_POLL_EVERY"` - HelLat float64 `env:"HEL_LAT"` - HelLon float64 `env:"HEL_LON"` - - // MQTT - MQTTBroker string `env:"MQTT_BROKER"` - MQTTClientID string `env:"MQTT_CLIENT_ID"` - MQTTUsername string `env:"MQTT_USERNAME"` - MQTTPassword string `env:"MQTT_PASSWORD"` - MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` - MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` - - // Application - LogLevel string `env:"LOG_LEVEL"` - HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` -} - -type Observation struct { - Station int `json:"station"` - Parameter string `json:"parameter"` - Time time.Time `json:"time"` - Value *float64 `json:"value,omitempty"` -} - -type ForecastValue struct { - Location struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - } `json:"location"` - - Model string `json:"model"` - RunTime time.Time `json:"run_time"` - ForecastTime time.Time `json:"forecast_time"` - Parameter string `json:"parameter"` - Value *float64 `json:"value,omitempty"` -} - -type JSONFloat64 float64 - -func (f JSONFloat64) MarshalJSON() ([]byte, error) { - if math.IsNaN(float64(f)) { - return []byte("null"), nil - } - return json.Marshal(float64(f)) -} - -func loadConfig() *Config { - // Load .env file if it exists - _ = godotenv.Load() - - cfg := &Config{ - FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), - StationID: parseInt(getEnv("STATION_ID", "100968")), - ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), - ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), - ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), - WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), - FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), - HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), - HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), - MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), - MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), - MQTTUsername: getEnv("MQTT_USERNAME", ""), - MQTTPassword: getEnv("MQTT_PASSWORD", ""), - MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")), - MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")), - LogLevel: getEnv("LOG_LEVEL", "debug"), - HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), - } - - return cfg -} - -func getEnv(key, defaultValue string) string { - if value := os.Getenv(key); value != "" { - return value - } - return defaultValue -} - -func parseDuration(s string) time.Duration { - d, err := time.ParseDuration(s) - if err != nil { - return 5 * time.Minute // Default fallback - } - return d -} - -func parseInt(s string) int { - i, err := strconv.Atoi(s) - if err != nil { - return 0 - } - return i -} - -func parseFloat(s string) float64 { - f, err := strconv.ParseFloat(s, 64) - if err != nil { - return 0 - } - return f -} - -func parseLogLevel(level string) slog.Level { - switch strings.ToLower(level) { - case "debug": - return slog.LevelDebug - case "warn", "warning": - return slog.LevelWarn - case "error": - return slog.LevelError - default: - return slog.LevelInfo - } -} - -func loadWatermark(filename string) time.Time { - b, err := os.ReadFile(filename) - if err != nil { - return time.Now().UTC().Add(-24 * time.Hour) - } - sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) - if err != nil { - return time.Now().UTC().Add(-24 * time.Hour) - } - return time.Unix(sec, 0).UTC() -} - -func storeWatermark(filename string, t time.Time) error { - data := []byte(fmt.Sprintf("%d", t.Unix())) - return os.WriteFile(filename, data, 0644) -} - -func fetchData(url string, timeout time.Duration) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "application/xml") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) - } - - return io.ReadAll(resp.Body) -} - -func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { - q := url.Values{} - q.Set("service", "WFS") - q.Set("version", "2.0.0") - q.Set("request", "GetFeature") - q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") - q.Set("fmisid", fmt.Sprintf("%d", stationID)) - q.Set("starttime", start.Format(time.RFC3339)) - q.Set("endtime", end.Format(time.RFC3339)) - q.Set("timestep", fmt.Sprintf("%d", timestep)) - - return endpoint + "?" + q.Encode() -} - -func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { - dec := xml.NewDecoder(bytes.NewReader(data)) - - var ( - param string - ts time.Time - maxObserved = minTime - out []Observation - inTVP bool - inTime bool - inValue bool - ) - - for { - tok, err := dec.Token() - if err == io.EOF { - break - } - if err != nil { - return nil, maxObserved, err - } - - switch el := tok.(type) { - case xml.StartElement: - switch el.Name.Local { - case "MeasurementTimeseries": - for _, a := range el.Attr { - if a.Name.Local == "id" { - parts := strings.Split(a.Value, "-") - if len(parts) > 0 { - param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) - } - } - } - case "MeasurementTVP": - inTVP = true - ts = time.Time{} - case "time": - if inTVP { - inTime = true - } - case "value": - if inTVP { - inValue = true - } - } - - case xml.CharData: - if inTime && inTVP { - s := strings.TrimSpace(string(el)) - if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { - ts = parsedTime - } - } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { - s := strings.TrimSpace(string(el)) - var valuePtr *float64 - - if !strings.EqualFold(s, "NaN") && s != "" { - if value, err := strconv.ParseFloat(s, 64); err == nil { - valuePtr = &value - } - } - - out = append(out, Observation{ - Station: stationID, - Parameter: param, - Time: ts, - Value: valuePtr, - }) - - if ts.After(maxObserved) { - maxObserved = ts - } - } - - case xml.EndElement: - switch el.Name.Local { - case "MeasurementTVP": - inTVP = false - case "time": - inTime = false - case "value": - inValue = false - } - } - } - - return out, maxObserved, nil -} - -func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { - q := url.Values{} - q.Set("service", "WFS") - q.Set("version", "2.0.0") - q.Set("request", "GetFeature") - q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") - q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) - q.Set("starttime", start.Format(time.RFC3339)) - q.Set("endtime", end.Format(time.RFC3339)) - - return endpoint + "?" + q.Encode() -} - -func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { - dec := xml.NewDecoder(bytes.NewReader(data)) - - var ( - param string - runTime time.Time - fcTime time.Time - out []ForecastValue - inTVP bool - inTime bool - inValue bool - ) - - for { - tok, err := dec.Token() - if err == io.EOF { - break - } - if err != nil { - return nil, runTime, err - } - - switch el := tok.(type) { - case xml.StartElement: - switch el.Name.Local { - case "resultTime": - var s string - if err := dec.DecodeElement(&s, &el); err == nil { - if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { - runTime = parsedTime - } - } - case "MeasurementTimeseries": - for _, a := range el.Attr { - if a.Name.Local == "id" { - parts := strings.Split(a.Value, "-") - if len(parts) > 0 { - param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) - } - } - } - case "MeasurementTVP": - inTVP = true - fcTime = time.Time{} - case "time": - if inTVP { - inTime = true - } - case "value": - if inTVP { - inValue = true - } - } - - case xml.CharData: - if inTime && inTVP { - s := strings.TrimSpace(string(el)) - if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { - fcTime = parsedTime - } - } else if inValue && inTVP && !fcTime.IsZero() { - s := strings.TrimSpace(string(el)) - var valuePtr *float64 - - if !strings.EqualFold(s, "NaN") && s != "" { - if value, err := strconv.ParseFloat(s, 64); err == nil { - valuePtr = &value - } - } - - fc := ForecastValue{ - Location: struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - }{lat, lon}, - Model: "harmonie", - RunTime: runTime, - ForecastTime: fcTime, - Parameter: param, - Value: valuePtr, - } - out = append(out, fc) - } - - case xml.EndElement: - switch el.Name.Local { - case "MeasurementTVP": - inTVP = false - case "time": - inTime = false - case "value": - inValue = false - } - } - } - - return out, runTime, nil -} - -func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) { - serverURL, err := url.Parse(cfg.MQTTBroker) - if err != nil { - return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) - } - - // Client configuration - cliCfg := autopaho.ClientConfig{ - ServerUrls: []*url.URL{serverURL}, - KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), - ClientConfig: paho.ClientConfig{ - ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), - OnPublishReceived: []func(paho.PublishReceived) (bool, error){ - func(pr paho.PublishReceived) (bool, error) { - logger.Debug("Received message", "topic", pr.Packet.Topic) - return true, nil - }, - }, - // Set OnDisconnect here - OnServerDisconnect: func(d *paho.Disconnect) { - if d != nil { - logger.Warn("MQTT disconnected", "reason", d.ReasonCode) - } else { - logger.Info("MQTT disconnected gracefully") - } - }, - }, - ConnectUsername: cfg.MQTTUsername, - ConnectPassword: []byte(cfg.MQTTPassword), - ConnectTimeout: cfg.HTTPTimeout, - } - - // MQTT v5 Connect properties - these need to be pointers - cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds()) - cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()) - cliCfg.CleanStartOnInitialConnection = false - - // Last Will and Testament (LWT) - cliCfg.WillMessage = &paho.WillMessage{ - Topic: "weather/clients/status", - Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)), - QoS: 1, - Retain: true, - } - - // TLS configuration (if using TLS) - if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { - cliCfg.TlsCfg = &tls.Config{ - MinVersion: tls.VersionTLS12, - } - } - - // Callbacks - cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { - logger.Info("MQTT v5 connected", - "broker", serverURL.Host, - "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil, - "keep_alive", cfg.MQTTKeepAlive) - } - - cliCfg.OnConnectError = func(err error) { - logger.Error("MQTT connection error", "error", err) - } - - // Connect to broker - cm, err := autopaho.NewConnection(ctx, cliCfg) - if err != nil { - return nil, fmt.Errorf("failed to create MQTT connection: %w", err) - } - - // Wait for connection to be established - err = cm.AwaitConnection(ctx) - if err != nil { - return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) - } - - return cm, nil -} - -func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error { - topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) - - // Safe JSON structure - type SafeObservation struct { - Station int `json:"station"` - Parameter string `json:"parameter"` - Time time.Time `json:"time"` - Value *JSONFloat64 `json:"value,omitempty"` - } - - var safeValue *JSONFloat64 - if obs.Value != nil { - v := JSONFloat64(*obs.Value) - safeValue = &v - } - - safeObs := SafeObservation{ - Station: obs.Station, - Parameter: obs.Parameter, - Time: obs.Time, - Value: safeValue, - } - - b, err := json.Marshal(safeObs) - if err != nil { - return err - } - - // MQTT v5 publish properties - props := &paho.PublishProperties{ - User: []paho.UserProperty{ - {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, - {Key: "parameter", Value: obs.Parameter}, - {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, - {Key: "data_type", Value: "observation"}, - {Key: "source", Value: "fmi"}, - }, - } - - pb := &paho.Publish{ - Topic: topic, - QoS: 1, - Retain: false, - Payload: b, - Properties: props, - } - - // Publish with context timeout - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err = cm.Publish(ctx, pb) - if err != nil { - return fmt.Errorf("failed to publish observation: %w", err) - } - - logger.Debug("Published observation", - "topic", topic, - "station", obs.Station, - "parameter", obs.Parameter, - "value", obs.Value, - "time", obs.Time) - - return nil -} - -func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error { - topic := fmt.Sprintf( - "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", - fc.RunTime.Format(time.RFC3339), - fc.Parameter, - ) - - // Safe JSON structure - type SafeForecastValue struct { - Location struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - } `json:"location"` - Model string `json:"model"` - RunTime time.Time `json:"run_time"` - ForecastTime time.Time `json:"forecast_time"` - Parameter string `json:"parameter"` - Value *JSONFloat64 `json:"value,omitempty"` - } - - var safeValue *JSONFloat64 - if fc.Value != nil { - v := JSONFloat64(*fc.Value) - safeValue = &v - } - - safeFc := SafeForecastValue{ - Location: fc.Location, - Model: fc.Model, - RunTime: fc.RunTime, - ForecastTime: fc.ForecastTime, - Parameter: fc.Parameter, - Value: safeValue, - } - - b, err := json.Marshal(safeFc) - if err != nil { - return err - } - - // MQTT v5 publish properties - props := &paho.PublishProperties{ - User: []paho.UserProperty{ - {Key: "model", Value: fc.Model}, - {Key: "parameter", Value: fc.Parameter}, - {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, - {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, - {Key: "data_type", Value: "forecast"}, - {Key: "source", Value: "fmi"}, - {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, - {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, - }, - } - - pb := &paho.Publish{ - Topic: topic, - QoS: 1, - Retain: true, // Forecasts are retained since they don't change - Payload: b, - Properties: props, - } - - // Publish with context timeout - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err = cm.Publish(ctx, pb) - if err != nil { - return fmt.Errorf("failed to publish forecast: %w", err) - } - - logger.Debug("Published forecast", - "topic", topic, - "parameter", fc.Parameter, - "run_time", fc.RunTime, - "forecast_time", fc.ForecastTime) - - return nil -} - -func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { - // Load initial state - lastObs := loadWatermark(cfg.WatermarkFile) - lastFcRun := time.Time{} - - // Create MQTT client - mqttClient, err := createMQTTClient(ctx, cfg, logger) - if err != nil { - return fmt.Errorf("failed to connect to MQTT: %w", err) - } - defer func() { - // Graceful disconnect - discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := mqttClient.Disconnect(discCtx); err != nil { - logger.Error("Failed to disconnect MQTT", "error", err) - } - }() - - // Create timers - obsTimer := time.NewTimer(0) // Start immediately - fcTimer := time.NewTimer(0) // Start immediately - - defer obsTimer.Stop() - defer fcTimer.Stop() - - logger.Info("Poller started", - "obs_interval", cfg.ObsPollEvery, - "fc_interval", cfg.FcPollEvery, - "mqtt_broker", cfg.MQTTBroker) - - for { - select { - case <-ctx.Done(): - logger.Info("Poller stopping") - return nil - - case <-obsTimer.C: - // Poll observations - obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) - if obsEnd.After(lastObs) { - logger.Info("Polling observations", "from", lastObs, "to", obsEnd) - - url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) - data, err := fetchData(url, cfg.HTTPTimeout) - if err != nil { - logger.Error("Failed to fetch observations", "error", err) - } else { - obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger) - - if err != nil { - logger.Error("Failed to parse observations", "error", err) - } else if len(obs) > 0 { - // Publish observations - successCount := 0 - for _, o := range obs { - if err := publishObservation(mqttClient, o, logger); err != nil { - logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) - } else { - successCount++ - } - } - - // Update watermark - lastObs = maxT - if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { - logger.Error("Failed to store watermark", "error", err) - } - - logger.Info("Published observations", - "successful", successCount, - "total", len(obs), - "new_watermark", lastObs) - } else { - logger.Debug("No new observations found") - } - } - } - obsTimer.Reset(cfg.ObsPollEvery) - - case <-fcTimer.C: - // Poll forecast - now := time.Now().UTC() - start := now - end := now.Add(48 * time.Hour) - - logger.Info("Polling forecast", "from", start, "to", end) - - url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) - data, err := fetchData(url, cfg.HTTPTimeout) - if err != nil { - logger.Error("Failed to fetch forecast", "error", err) - } else { - fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger) - - if err != nil { - logger.Error("Failed to parse forecast", "error", err) - } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { - // Publish forecast - successCount := 0 - for _, f := range fc { - if err := publishForecast(mqttClient, f, logger); err != nil { - logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) - } else { - successCount++ - } - } - - lastFcRun = runTime - logger.Info("Published forecast", - "successful", successCount, - "total", len(fc), - "run_time", runTime) - } else { - logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) - } - } - fcTimer.Reset(cfg.FcPollEvery) - } - } -} - -func main() { - // Load configuration - cfg := loadConfig() - - // Setup logging - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: parseLogLevel(cfg.LogLevel), - })) - - // Create context for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Setup signal handling for graceful shutdown - sigChan := make(chan os.Signal, 1) - // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - // For now, we don't handle signals in this simplified version - // In production, you'd want to handle SIGINT and SIGTERM - <-sigChan - logger.Info("Received shutdown signal") - cancel() - }() - - // Run the poller - logger.Info("Starting weather data poller with MQTT v5") - if err := runPoller(ctx, cfg, logger); err != nil { - logger.Error("Poller failed", "error", err) - os.Exit(1) - } - - logger.Info("Poller stopped gracefully") -} diff --git a/pub.service b/pub.service index 34eb743..9cfa51f 100644 --- a/pub.service +++ b/pub.service @@ -1,4 +1,4 @@ -# /etc/systemd/system/atom-publisher.service +# /etc/systemd/system/pub.service [Unit] Description=Data Publication HUB After=network.target @@ -10,7 +10,7 @@ Type=simple Restart=always RestartSec=10 User=publisher -ExecStart=/usr/local/bin/hub +ExecStart=/usr/local/bin/pub MemoryHigh=20M MemoryMax=30M |
