aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md30
-rw-r--r--cmd/publisher/main.go417
-rw-r--r--cmd/subscriber/main.go310
-rw-r--r--go.mod2
-rw-r--r--internal/weather/config.go102
-rw-r--r--internal/weather/fmi.go258
-rw-r--r--internal/weather/mqtt.go76
-rw-r--r--internal/weather/types.go44
-rw-r--r--internal/weather/utils.go82
-rw-r--r--main.go792
-rw-r--r--pub.service4
11 files changed, 1322 insertions, 795 deletions
diff --git a/README.md b/README.md
index 5979c0b..1c61a59 100644
--- a/README.md
+++ b/README.md
@@ -3,3 +3,33 @@
## Downloading source data
curl -G "https://opendata.fmi.fi/wfs" --data-urlencode "service=WFS" --data-urlencode "version=2.0.0" --data-urlencode "request=GetFeature" --data-urlencode "storedquery_id=fmi::observations::weather::timevaluepair" --data-urlencode "fmisid=100968" --data-urlencode "starttime=2025-12-28T00:00:00Z" --data-urlencode "endtime=2025-12-28T01:00:00Z" --data-urlencode "timestep=60" -o observation.xml
+
+### Instructions
+
+Build
+
+```bash
+go build -o bin/publisher ./cmd/publisher
+go build -o bin/subscriber ./cmd/subscriber
+```
+
+Run
+
+```bash
+go run ./cmd/publisher
+go run ./cmd/subscriber
+```
+
+Install
+
+```bash
+go install ./cmd/publisher
+go install ./cmd/subscriber
+```
+
+Then run from anywhere
+
+```bash
+publisher
+subscriber
+```
diff --git a/cmd/publisher/main.go b/cmd/publisher/main.go
new file mode 100644
index 0000000..ab84616
--- /dev/null
+++ b/cmd/publisher/main.go
@@ -0,0 +1,417 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/joho/godotenv"
+ "hub/internal/weather"
+)
+
+type PublisherConfig struct {
+ // FMI API
+ FMIEndpoint string `env:"FMI_ENDPOINT"`
+
+ // Observation
+ StationID int `env:"STATION_ID"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
+ ObsTimestep int `env:"OBS_TIMESTEP"`
+ WatermarkFile string `env:"WATERMARK_FILE"`
+
+ // Forecast
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
+ HelLat float64 `env:"HEL_LAT"`
+ HelLon float64 `env:"HEL_LON"`
+
+ // MQTT
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
+}
+
+func loadPublisherConfig() *PublisherConfig {
+ _ = godotenv.Load()
+
+ return &PublisherConfig{
+ FMIEndpoint: weather.GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: weather.ParseInt(weather.GetEnv("STATION_ID", "100968")),
+ ObsPollEvery: weather.ParseDuration(weather.GetEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: weather.ParseDuration(weather.GetEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: weather.ParseInt(weather.GetEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: weather.GetEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: weather.ParseDuration(weather.GetEnv("FC_POLL_EVERY", "1h")),
+ HelLat: weather.ParseFloat(weather.GetEnv("HEL_LAT", "60.1699")),
+ HelLon: weather.ParseFloat(weather.GetEnv("HEL_LON", "24.9384")),
+ MQTTBroker: weather.GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: weather.GetEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: weather.GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: weather.GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: weather.ParseDuration(weather.GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: weather.ParseDuration(weather.GetEnv("MQTT_SESSION_EXP", "1h")),
+ LogLevel: weather.GetEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: weather.ParseDuration(weather.GetEnv("HTTP_TIMEOUT", "30s")),
+ }
+}
+
+func loadWatermark(filename string) time.Time {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ return time.Unix(sec, 0).UTC()
+}
+
+func storeWatermark(filename string, t time.Time) error {
+ data := []byte(fmt.Sprintf("%d", t.Unix()))
+ return os.WriteFile(filename, data, 0644)
+}
+
+func publishObservation(cm *autopaho.ConnectionManager, obs weather.Observation, logger *slog.Logger) error {
+ topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
+
+ // Safe JSON structure
+ type SafeObservation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value *weather.JSONFloat64 `json:"value,omitempty"`
+ }
+
+ var safeValue *weather.JSONFloat64
+ if obs.Value != nil {
+ v := weather.JSONFloat64(*obs.Value)
+ safeValue = &v
+ }
+
+ safeObs := SafeObservation{
+ Station: obs.Station,
+ Parameter: obs.Parameter,
+ Time: obs.Time,
+ Value: safeValue,
+ }
+
+ b, err := json.Marshal(safeObs)
+ if err != nil {
+ return err
+ }
+
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)},
+ {Key: "parameter", Value: obs.Parameter},
+ {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)},
+ {Key: "data_type", Value: "observation"},
+ {Key: "source", Value: "fmi"},
+ },
+ }
+
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: false,
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish observation: %w", err)
+ }
+
+ logger.Debug("Published observation",
+ "topic", topic,
+ "station", obs.Station,
+ "parameter", obs.Parameter,
+ "value", obs.Value,
+ "time", obs.Time)
+
+ return nil
+}
+
+func publishForecast(cm *autopaho.ConnectionManager, fc weather.ForecastValue, logger *slog.Logger) error {
+ topic := fmt.Sprintf(
+ "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
+ fc.RunTime.Format(time.RFC3339),
+ fc.Parameter,
+ )
+
+ // Safe JSON structure
+ type SafeForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value *weather.JSONFloat64 `json:"value,omitempty"`
+ }
+
+ var safeValue *weather.JSONFloat64
+ if fc.Value != nil {
+ v := weather.JSONFloat64(*fc.Value)
+ safeValue = &v
+ }
+
+ safeFc := SafeForecastValue{
+ Location: fc.Location,
+ Model: fc.Model,
+ RunTime: fc.RunTime,
+ ForecastTime: fc.ForecastTime,
+ Parameter: fc.Parameter,
+ Value: safeValue,
+ }
+
+ b, err := json.Marshal(safeFc)
+ if err != nil {
+ return err
+ }
+
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "model", Value: fc.Model},
+ {Key: "parameter", Value: fc.Parameter},
+ {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)},
+ {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)},
+ {Key: "data_type", Value: "forecast"},
+ {Key: "source", Value: "fmi"},
+ {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)},
+ {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)},
+ },
+ }
+
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: true, // Forecasts are retained since they don't change
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish forecast: %w", err)
+ }
+
+ logger.Debug("Published forecast",
+ "topic", topic,
+ "parameter", fc.Parameter,
+ "run_time", fc.RunTime,
+ "forecast_time", fc.ForecastTime)
+
+ return nil
+}
+
+func runPublisher(ctx context.Context, cfg *PublisherConfig, logger *slog.Logger) error {
+ // Load initial state
+ lastObs := loadWatermark(cfg.WatermarkFile)
+ lastFcRun := time.Time{}
+
+ // Create MQTT client
+ mqttCfg := weather.MQTTConfig{
+ Broker: cfg.MQTTBroker,
+ ClientID: cfg.MQTTClientID,
+ Username: cfg.MQTTUsername,
+ Password: cfg.MQTTPassword,
+ KeepAlive: cfg.MQTTKeepAlive,
+ SessionExpiry: cfg.MQTTSessionExp,
+ Timeout: cfg.HTTPTimeout,
+ }
+
+ mqttClient, err := weather.CreateMQTTClient(ctx, mqttCfg, func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ logger.Info("MQTT v5 connected",
+ "session_present", connAck.SessionPresent,
+ "keep_alive", cfg.MQTTKeepAlive)
+ })
+ if err != nil {
+ return fmt.Errorf("failed to connect to MQTT: %w", err)
+ }
+ defer func() {
+ discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := mqttClient.Disconnect(discCtx); err != nil {
+ logger.Error("Failed to disconnect MQTT", "error", err)
+ }
+ }()
+
+ // Create timers
+ obsTimer := time.NewTimer(0) // Start immediately
+ fcTimer := time.NewTimer(0) // Start immediately
+
+ defer obsTimer.Stop()
+ defer fcTimer.Stop()
+
+ logger.Info("Poller started",
+ "obs_interval", cfg.ObsPollEvery,
+ "fc_interval", cfg.FcPollEvery,
+ "mqtt_broker", cfg.MQTTBroker)
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Info("Poller stopping")
+ return nil
+
+ case <-obsTimer.C:
+ // Poll observations
+ obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag)
+ if obsEnd.After(lastObs) {
+ logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
+
+ url := weather.BuildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
+ data, err := weather.FetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch observations", "error", err)
+ } else {
+ obs, maxT, err := weather.ParseObservations(data, cfg.StationID, lastObs)
+
+ if err != nil {
+ logger.Error("Failed to parse observations", "error", err)
+ } else if len(obs) > 0 {
+ // Publish observations
+ successCount := 0
+ for _, o := range obs {
+ if err := publishObservation(mqttClient, o, logger); err != nil {
+ logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter)
+ } else {
+ successCount++
+ }
+ }
+
+ // Update watermark
+ lastObs = maxT
+ if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil {
+ logger.Error("Failed to store watermark", "error", err)
+ }
+
+ logger.Info("Published observations",
+ "successful", successCount,
+ "total", len(obs),
+ "new_watermark", lastObs)
+ } else {
+ logger.Debug("No new observations found")
+ }
+ }
+ }
+ obsTimer.Reset(cfg.ObsPollEvery)
+
+ case <-fcTimer.C:
+ // Poll forecast
+ now := time.Now().UTC()
+ start := now
+ end := now.Add(48 * time.Hour)
+
+ logger.Info("Polling forecast", "from", start, "to", end)
+
+ url := weather.BuildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
+ data, err := weather.FetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch forecast", "error", err)
+ } else {
+ fc, runTime, err := weather.ParseForecast(data, cfg.HelLat, cfg.HelLon)
+
+ if err != nil {
+ logger.Error("Failed to parse forecast", "error", err)
+ } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
+ // Publish forecast
+ successCount := 0
+ for _, f := range fc {
+ if err := publishForecast(mqttClient, f, logger); err != nil {
+ logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter)
+ } else {
+ successCount++
+ }
+ }
+
+ lastFcRun = runTime
+ logger.Info("Published forecast",
+ "successful", successCount,
+ "total", len(fc),
+ "run_time", runTime)
+ } else {
+ logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun)
+ }
+ }
+ fcTimer.Reset(cfg.FcPollEvery)
+ }
+ }
+}
+
+func main() {
+ cfg := loadPublisherConfig()
+
+ // Setup logging
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: parseLogLevel(cfg.LogLevel),
+ }))
+
+ // Create context for graceful shutdown
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Setup signal handling
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ sig := <-sigChan
+ logger.Info("Received signal", "signal", sig)
+ cancel()
+ }()
+
+ // Run the publisher
+ logger.Info("Starting weather data publisher with MQTT v5")
+ if err := runPublisher(ctx, cfg, logger); err != nil {
+ logger.Error("Publisher failed", "error", err)
+ os.Exit(1)
+ }
+
+ logger.Info("Publisher stopped gracefully")
+}
+
+func parseLogLevel(level string) slog.Level {
+ switch strings.ToLower(level) {
+ case "debug":
+ return slog.LevelDebug
+ case "warn", "warning":
+ return slog.LevelWarn
+ case "error":
+ return slog.LevelError
+ default:
+ return slog.LevelInfo
+ }
+}
diff --git a/cmd/subscriber/main.go b/cmd/subscriber/main.go
new file mode 100644
index 0000000..f561d60
--- /dev/null
+++ b/cmd/subscriber/main.go
@@ -0,0 +1,310 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "net/url"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/joho/godotenv"
+ "hub/internal/weather"
+)
+
+// MessageHandler wraps the function to process incoming messages
+type MessageHandler func(*paho.Publish)
+
+func main() {
+ // Load environment variables
+ _ = godotenv.Load()
+
+ // Load configuration
+ cfg := weather.LoadSubscriberConfig()
+
+ // Setup logging
+ var logLevel slog.Level
+ switch strings.ToLower(cfg.LogLevel) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "warn", "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ default:
+ logLevel = slog.LevelInfo
+ }
+
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: logLevel,
+ }))
+
+ // Create context for graceful shutdown
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Handle signals
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ sig := <-sigChan
+ logger.Info("Received signal", "signal", sig)
+ cancel()
+ }()
+
+ // Run the subscriber
+ logger.Info("Starting MQTT v5 subscriber for FMI weather data",
+ "broker", cfg.MQTTBroker,
+ "topics", cfg.Topics,
+ "qos", cfg.QoS)
+
+ if err := runSubscriber(ctx, cfg, logger); err != nil {
+ logger.Error("Subscriber failed", "error", err)
+ os.Exit(1)
+ }
+
+ logger.Info("Subscriber stopped gracefully")
+}
+
+func runSubscriber(ctx context.Context, cfg *weather.SubscriberConfig, logger *slog.Logger) error {
+ // Initialize statistics
+ stats := weather.InitTopicStats()
+ statsMutex := &sync.RWMutex{}
+
+ // Create MQTT client with custom configuration
+ serverURL, err := url.Parse(cfg.MQTTBroker)
+ if err != nil {
+ return fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ // Create message handler
+ messageHandler := func(pr paho.PublishReceived) (bool, error) {
+ go handleMessage(pr.Packet, logger, stats, statsMutex)
+ return true, nil
+ }
+
+ // Client configuration
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()),
+ OnPublishReceived: []func(paho.PublishReceived) (bool, error){
+ messageHandler,
+ },
+ OnServerDisconnect: func(d *paho.Disconnect) {
+ if d != nil && d.ReasonCode != 0 {
+ logger.Warn("MQTT disconnected", "reason", d.ReasonCode)
+ } else {
+ logger.Info("MQTT disconnected gracefully")
+ }
+ },
+ },
+ ConnectUsername: cfg.MQTTUsername,
+ ConnectPassword: []byte(cfg.MQTTPassword),
+ ConnectTimeout: 30 * time.Second,
+ }
+
+ cliCfg.CleanStartOnInitialConnection = false
+ cliCfg.SessionExpiryInterval = uint32(cfg.MQTTSessionExp.Seconds())
+
+ // Connection callbacks
+ cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ logger.Info("MQTT v5 connected",
+ "session_present", connAck.SessionPresent,
+ "keep_alive", cfg.MQTTKeepAlive)
+
+ // Subscribe to topics after connection is established
+ if len(cfg.Topics) > 0 {
+ subscriptions := make([]paho.SubscribeOptions, len(cfg.Topics))
+ for i, topic := range cfg.Topics {
+ subscriptions[i] = paho.SubscribeOptions{
+ Topic: topic,
+ QoS: byte(cfg.QoS),
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // FIXED: SubscriptionIdentifier is a pointer
+ subID := int(1)
+ subProps := &paho.SubscribeProperties{
+ SubscriptionIdentifier: &subID,
+ }
+
+ if _, err := cm.Subscribe(ctx, &paho.Subscribe{
+ Subscriptions: subscriptions,
+ Properties: subProps,
+ }); err != nil {
+ logger.Error("Failed to subscribe", "error", err)
+ } else {
+ logger.Info("Subscribed to topics",
+ "topics", cfg.Topics,
+ "qos", cfg.QoS)
+ }
+ }
+ }
+
+ cliCfg.OnConnectError = func(err error) {
+ logger.Error("MQTT connection error", "error", err)
+ }
+
+ // Connect to broker
+ mqttClient, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ // Wait for connection
+ err = mqttClient.AwaitConnection(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to establish MQTT connection: %w", err)
+ }
+
+ defer func() {
+ discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := mqttClient.Disconnect(discCtx); err != nil {
+ logger.Error("Failed to disconnect MQTT", "error", err)
+ }
+ }()
+
+ logger.Info("Subscriber started and waiting for messages...")
+
+ // Statistics reporting goroutine
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Print final statistics
+ statsMutex.RLock()
+ finalStats := stats.MessagesReceived
+ statsMutex.RUnlock()
+
+ if len(finalStats) > 0 {
+ logger.Info("Final message statistics", "stats", finalStats)
+ }
+ return nil
+
+ case <-ticker.C:
+ if cfg.EnableDebug {
+ statsMutex.RLock()
+ currentStats := stats.MessagesReceived
+ statsMutex.RUnlock()
+
+ if len(currentStats) > 0 {
+ logger.Debug("Message statistics", "stats", currentStats)
+ }
+ }
+ }
+ }
+}
+
+func handleMessage(pkt *paho.Publish, logger *slog.Logger,
+ stats *weather.TopicStats, mutex *sync.RWMutex) {
+
+ // Update statistics
+ mutex.Lock()
+ stats.MessagesReceived[pkt.Topic]++
+ stats.LastMessageTime[pkt.Topic] = time.Now()
+ mutex.Unlock()
+
+ // Route based on topic pattern
+ switch {
+ case strings.HasPrefix(pkt.Topic, "weather/obs/"):
+ processObservationMessage(pkt, logger)
+ case strings.HasPrefix(pkt.Topic, "weather/forecast/"):
+ processForecastMessage(pkt, logger)
+ default:
+ logger.Warn("Unknown topic pattern", "topic", pkt.Topic)
+ // Try to decode as generic JSON
+ var data map[string]interface{}
+ if err := json.Unmarshal(pkt.Payload, &data); err == nil {
+ logger.Info("Generic message", "topic", pkt.Topic, "data", data)
+ } else {
+ logger.Info("Raw message", "topic", pkt.Topic, "payload", string(pkt.Payload))
+ }
+ }
+}
+
+func processObservationMessage(pkt *paho.Publish, logger *slog.Logger) {
+ var obs weather.Observation
+ if err := json.Unmarshal(pkt.Payload, &obs); err != nil {
+ logger.Error("Failed to parse observation",
+ "error", err,
+ "topic", pkt.Topic)
+ return
+ }
+
+ // Extract user properties
+ userProps := make(map[string]string)
+ if pkt.Properties != nil {
+ for _, prop := range pkt.Properties.User {
+ userProps[prop.Key] = prop.Value
+ }
+ }
+
+ // Format value for display
+ valueStr := "null"
+ if obs.Value != nil {
+ valueStr = fmt.Sprintf("%.2f", *obs.Value)
+ }
+
+ logger.Info("Observation received",
+ "topic", pkt.Topic,
+ "station", obs.Station,
+ "parameter", obs.Parameter,
+ "time", obs.Time.Format(time.RFC3339),
+ "value", valueStr,
+ "user_properties", userProps,
+ "retain", pkt.Retain,
+ "qos", pkt.QoS)
+}
+
+func processForecastMessage(pkt *paho.Publish, logger *slog.Logger) {
+ var fc weather.ForecastValue
+ if err := json.Unmarshal(pkt.Payload, &fc); err != nil {
+ logger.Error("Failed to parse forecast",
+ "error", err,
+ "topic", pkt.Topic)
+ return
+ }
+
+ // Extract user properties
+ userProps := make(map[string]string)
+ if pkt.Properties != nil {
+ for _, prop := range pkt.Properties.User {
+ userProps[prop.Key] = prop.Value
+ }
+ }
+
+ // Format value for display
+ valueStr := "null"
+ if fc.Value != nil {
+ valueStr = fmt.Sprintf("%.2f", *fc.Value)
+ }
+
+ logger.Info("Forecast received",
+ "topic", pkt.Topic,
+ "model", fc.Model,
+ "parameter", fc.Parameter,
+ "run_time", fc.RunTime.Format(time.RFC3339),
+ "forecast_time", fc.ForecastTime.Format(time.RFC3339),
+ "location", fmt.Sprintf("(%.4f,%.4f)", fc.Location.Lat, fc.Location.Lon),
+ "value", valueStr,
+ "user_properties", userProps,
+ "retain", pkt.Retain,
+ "qos", pkt.QoS)
+}
diff --git a/go.mod b/go.mod
index 724ee2b..41a0150 100644
--- a/go.mod
+++ b/go.mod
@@ -1,4 +1,4 @@
-module pub
+module hub
go 1.25.5
diff --git a/internal/weather/config.go b/internal/weather/config.go
new file mode 100644
index 0000000..6450da4
--- /dev/null
+++ b/internal/weather/config.go
@@ -0,0 +1,102 @@
+package weather
+
+import (
+ "time"
+)
+
+// PublisherConfig configuration for the publisher
+type PublisherConfig struct {
+ // FMI API
+ FMIEndpoint string `env:"FMI_ENDPOINT"`
+
+ // Observation
+ StationID int `env:"STATION_ID"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
+ ObsTimestep int `env:"OBS_TIMESTEP"`
+ WatermarkFile string `env:"WATERMARK_FILE"`
+
+ // Forecast
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
+ HelLat float64 `env:"HEL_LAT"`
+ HelLon float64 `env:"HEL_LON"`
+
+ // MQTT
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
+}
+
+// SubscriberConfig configuration for the subscriber
+type SubscriberConfig struct {
+ // MQTT Configuration
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Subscription Configuration
+ Topics []string `env:"SUBSCRIBE_TOPICS"`
+ QoS int `env:"SUBSCRIBE_QOS"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ EnableDebug bool `env:"ENABLE_DEBUG"`
+}
+
+// LoadPublisherConfig loads publisher configuration from environment
+func LoadPublisherConfig() *PublisherConfig {
+ cfg := &PublisherConfig{
+ FMIEndpoint: GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: ParseInt(GetEnv("STATION_ID", "100968")),
+ ObsPollEvery: ParseDuration(GetEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: ParseDuration(GetEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: ParseInt(GetEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: GetEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: ParseDuration(GetEnv("FC_POLL_EVERY", "1h")),
+ HelLat: ParseFloat(GetEnv("HEL_LAT", "60.1699")),
+ HelLon: ParseFloat(GetEnv("HEL_LON", "24.9384")),
+ MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")),
+ LogLevel: GetEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: ParseDuration(GetEnv("HTTP_TIMEOUT", "30s")),
+ }
+
+ return cfg
+}
+
+// LoadSubscriberConfig loads subscriber configuration from environment
+func LoadSubscriberConfig() *SubscriberConfig {
+ qos := ParseInt(GetEnv("SUBSCRIBE_QOS", "1"))
+ if qos < 0 || qos > 2 {
+ qos = 1
+ }
+
+ cfg := &SubscriberConfig{
+ MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-subscriber"),
+ MQTTUsername: GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")),
+ Topics: ParseTopics(GetEnv("SUBSCRIBE_TOPICS", "")),
+ QoS: qos,
+ LogLevel: GetEnv("LOG_LEVEL", "info"),
+ EnableDebug: GetEnv("ENABLE_DEBUG", "false") == "true",
+ }
+
+ return cfg
+}
diff --git a/internal/weather/fmi.go b/internal/weather/fmi.go
new file mode 100644
index 0000000..52c9f63
--- /dev/null
+++ b/internal/weather/fmi.go
@@ -0,0 +1,258 @@
+package weather
+
+import (
+ "bytes"
+ "context"
+ "encoding/xml"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// BuildObservationsURL constructs the URL for fetching observations from FMI.
+func BuildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
+ q.Set("fmisid", fmt.Sprintf("%d", stationID))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+ q.Set("timestep", fmt.Sprintf("%d", timestep))
+
+ return endpoint + "?" + q.Encode()
+}
+
+// BuildForecastURL constructs the URL for fetching forecasts from FMI.
+func BuildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
+ q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+
+ return endpoint + "?" + q.Encode()
+}
+
+// FetchData fetches data from the given URL with a timeout.
+func FetchData(url string, timeout time.Duration) ([]byte, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Accept", "application/xml")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
+ }
+
+ return io.ReadAll(resp.Body)
+}
+
+// ParseObservations parses the XML response from FMI observations query.
+func ParseObservations(data []byte, stationID int, minTime time.Time) ([]Observation, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
+
+ var (
+ param string
+ ts time.Time
+ maxObserved = minTime
+ out []Observation
+ inTVP bool
+ inTime bool
+ inValue bool
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, maxObserved, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
+ }
+ }
+ case "MeasurementTVP":
+ inTVP = true
+ ts = time.Time{}
+ case "time":
+ if inTVP {
+ inTime = true
+ }
+ case "value":
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ ts = parsedTime
+ }
+ } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
+ }
+ }
+
+ out = append(out, Observation{
+ Station: stationID,
+ Parameter: param,
+ Time: ts,
+ Value: valuePtr,
+ })
+
+ if ts.After(maxObserved) {
+ maxObserved = ts
+ }
+ }
+
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
+ }
+
+ return out, maxObserved, nil
+}
+
+// ParseForecast parses the XML response from FMI forecast query.
+func ParseForecast(data []byte, lat, lon float64) ([]ForecastValue, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
+
+ var (
+ param string
+ runTime time.Time
+ fcTime time.Time
+ out []ForecastValue
+ inTVP bool
+ inTime bool
+ inValue bool
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, runTime, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "resultTime":
+ var s string
+ if err := dec.DecodeElement(&s, &el); err == nil {
+ if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil {
+ runTime = parsedTime
+ }
+ }
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
+ }
+ }
+ case "MeasurementTVP":
+ inTVP = true
+ fcTime = time.Time{}
+ case "time":
+ if inTVP {
+ inTime = true
+ }
+ case "value":
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ fcTime = parsedTime
+ }
+ } else if inValue && inTVP && !fcTime.IsZero() {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
+ }
+ }
+
+ fc := ForecastValue{
+ Location: struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ }{lat, lon},
+ Model: "harmonie",
+ RunTime: runTime,
+ ForecastTime: fcTime,
+ Parameter: param,
+ Value: valuePtr,
+ }
+ out = append(out, fc)
+ }
+
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
+ }
+
+ return out, runTime, nil
+}
diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go
new file mode 100644
index 0000000..211b441
--- /dev/null
+++ b/internal/weather/mqtt.go
@@ -0,0 +1,76 @@
+package weather
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "net/url"
+ "os"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+)
+
+// MQTTConfig holds MQTT connection configuration
+type MQTTConfig struct {
+ Broker string
+ ClientID string
+ Username string
+ Password string
+ KeepAlive time.Duration
+ SessionExpiry time.Duration
+ Timeout time.Duration
+ OnPublishReceived []func(paho.PublishReceived) (bool, error)
+}
+
+// CreateMQTTClient creates a MQTT client with the given configuration
+func CreateMQTTClient(ctx context.Context, cfg MQTTConfig,
+ onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) {
+
+ serverURL, err := url.Parse(cfg.Broker)
+ if err != nil {
+ return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.KeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()),
+ OnPublishReceived: cfg.OnPublishReceived,
+ OnServerDisconnect: func(d *paho.Disconnect) {
+
+ // Default disconnect handler
+ },
+ },
+ ConnectUsername: cfg.Username,
+ ConnectPassword: []byte(cfg.Password),
+ ConnectTimeout: cfg.Timeout,
+ }
+
+ cliCfg.CleanStartOnInitialConnection = false
+ cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds())
+
+ if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
+ cliCfg.TlsCfg = &tls.Config{
+ MinVersion: tls.VersionTLS12,
+ }
+ }
+
+ if onConnect != nil {
+ cliCfg.OnConnectionUp = onConnect
+ }
+
+ cm, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ err = cm.AwaitConnection(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
+ }
+
+ return cm, nil
+}
diff --git a/internal/weather/types.go b/internal/weather/types.go
new file mode 100644
index 0000000..5563bfa
--- /dev/null
+++ b/internal/weather/types.go
@@ -0,0 +1,44 @@
+package weather
+
+import (
+ "encoding/json"
+ "math"
+ "time"
+)
+
+// Observation represents a weather observation from a station
+type Observation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value *float64 `json:"value,omitempty"`
+}
+
+// ForecastValue represents a weather forecast value
+type ForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value *float64 `json:"value,omitempty"`
+}
+
+// JSONFloat64 handles NaN values properly in JSON
+type JSONFloat64 float64
+
+func (f JSONFloat64) MarshalJSON() ([]byte, error) {
+ if math.IsNaN(float64(f)) {
+ return []byte("null"), nil
+ }
+ return json.Marshal(float64(f))
+}
+
+// TopicStats tracks message statistics for subscribers
+type TopicStats struct {
+ MessagesReceived map[string]int
+ LastMessageTime map[string]time.Time
+}
diff --git a/internal/weather/utils.go b/internal/weather/utils.go
new file mode 100644
index 0000000..b59949a
--- /dev/null
+++ b/internal/weather/utils.go
@@ -0,0 +1,82 @@
+// internal/weather/utils.go
+package weather
+
+import (
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// GetEnv gets environment variable with default
+func GetEnv(key, defaultValue string) string {
+ if value := os.Getenv(key); value != "" {
+ return value
+ }
+ return defaultValue
+}
+
+// ParseDuration parses duration string
+func ParseDuration(s string) time.Duration {
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return 5 * time.Minute
+ }
+ return d
+}
+
+// ParseInt parses integer string
+func ParseInt(s string) int {
+ i, err := strconv.Atoi(s)
+ if err != nil {
+ return 0
+ }
+ return i
+}
+
+// ParseFloat parses float string
+func ParseFloat(s string) float64 {
+ f, err := strconv.ParseFloat(s, 64)
+ if err != nil {
+ return 0
+ }
+ return f
+}
+
+// ParseTopics parses comma-separated topics
+func ParseTopics(s string) []string {
+ if s == "" {
+ return []string{
+ "weather/obs/fmi/#",
+ "weather/forecast/fmi/#",
+ }
+ }
+ return strings.Split(s, ",")
+}
+
+// LoadWatermark loads last observation time from file
+func LoadWatermark(filename string) time.Time {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ return time.Unix(sec, 0).UTC()
+}
+
+// StoreWatermark stores observation time to file
+func StoreWatermark(filename string, t time.Time) error {
+ data := []byte(strconv.FormatInt(t.Unix(), 10))
+ return os.WriteFile(filename, data, 0644)
+}
+
+// InitTopicStats initializes topic statistics
+func InitTopicStats() *TopicStats {
+ return &TopicStats{
+ MessagesReceived: make(map[string]int),
+ LastMessageTime: make(map[string]time.Time),
+ }
+}
diff --git a/main.go b/main.go
deleted file mode 100644
index c512022..0000000
--- a/main.go
+++ /dev/null
@@ -1,792 +0,0 @@
-package main
-
-import (
- "bytes"
- "context"
- "crypto/tls"
- "encoding/json"
- "encoding/xml"
- "fmt"
- "io"
- "log/slog"
- "math"
- "net/http"
- "net/url"
- "os"
- "strconv"
- "strings"
- "time"
-
- "github.com/eclipse/paho.golang/autopaho"
- "github.com/eclipse/paho.golang/paho"
- "github.com/joho/godotenv"
-)
-
-type Config struct {
- // FMI API
- FMIEndpoint string `env:"FMI_ENDPOINT"`
-
- // Observation
- StationID int `env:"STATION_ID"`
- ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
- ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
- ObsTimestep int `env:"OBS_TIMESTEP"`
- WatermarkFile string `env:"WATERMARK_FILE"`
-
- // Forecast
- FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
- HelLat float64 `env:"HEL_LAT"`
- HelLon float64 `env:"HEL_LON"`
-
- // MQTT
- MQTTBroker string `env:"MQTT_BROKER"`
- MQTTClientID string `env:"MQTT_CLIENT_ID"`
- MQTTUsername string `env:"MQTT_USERNAME"`
- MQTTPassword string `env:"MQTT_PASSWORD"`
- MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
- MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
-
- // Application
- LogLevel string `env:"LOG_LEVEL"`
- HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
-}
-
-type Observation struct {
- Station int `json:"station"`
- Parameter string `json:"parameter"`
- Time time.Time `json:"time"`
- Value *float64 `json:"value,omitempty"`
-}
-
-type ForecastValue struct {
- Location struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- } `json:"location"`
-
- Model string `json:"model"`
- RunTime time.Time `json:"run_time"`
- ForecastTime time.Time `json:"forecast_time"`
- Parameter string `json:"parameter"`
- Value *float64 `json:"value,omitempty"`
-}
-
-type JSONFloat64 float64
-
-func (f JSONFloat64) MarshalJSON() ([]byte, error) {
- if math.IsNaN(float64(f)) {
- return []byte("null"), nil
- }
- return json.Marshal(float64(f))
-}
-
-func loadConfig() *Config {
- // Load .env file if it exists
- _ = godotenv.Load()
-
- cfg := &Config{
- FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
- StationID: parseInt(getEnv("STATION_ID", "100968")),
- ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")),
- ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")),
- ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")),
- WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"),
- FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
- HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
- HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
- MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
- MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"),
- MQTTUsername: getEnv("MQTT_USERNAME", ""),
- MQTTPassword: getEnv("MQTT_PASSWORD", ""),
- MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")),
- MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")),
- LogLevel: getEnv("LOG_LEVEL", "debug"),
- HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
- }
-
- return cfg
-}
-
-func getEnv(key, defaultValue string) string {
- if value := os.Getenv(key); value != "" {
- return value
- }
- return defaultValue
-}
-
-func parseDuration(s string) time.Duration {
- d, err := time.ParseDuration(s)
- if err != nil {
- return 5 * time.Minute // Default fallback
- }
- return d
-}
-
-func parseInt(s string) int {
- i, err := strconv.Atoi(s)
- if err != nil {
- return 0
- }
- return i
-}
-
-func parseFloat(s string) float64 {
- f, err := strconv.ParseFloat(s, 64)
- if err != nil {
- return 0
- }
- return f
-}
-
-func parseLogLevel(level string) slog.Level {
- switch strings.ToLower(level) {
- case "debug":
- return slog.LevelDebug
- case "warn", "warning":
- return slog.LevelWarn
- case "error":
- return slog.LevelError
- default:
- return slog.LevelInfo
- }
-}
-
-func loadWatermark(filename string) time.Time {
- b, err := os.ReadFile(filename)
- if err != nil {
- return time.Now().UTC().Add(-24 * time.Hour)
- }
- sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
- if err != nil {
- return time.Now().UTC().Add(-24 * time.Hour)
- }
- return time.Unix(sec, 0).UTC()
-}
-
-func storeWatermark(filename string, t time.Time) error {
- data := []byte(fmt.Sprintf("%d", t.Unix()))
- return os.WriteFile(filename, data, 0644)
-}
-
-func fetchData(url string, timeout time.Duration) ([]byte, error) {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
-
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
- if err != nil {
- return nil, err
- }
- req.Header.Set("Accept", "application/xml")
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
- }
-
- return io.ReadAll(resp.Body)
-}
-
-func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
- q := url.Values{}
- q.Set("service", "WFS")
- q.Set("version", "2.0.0")
- q.Set("request", "GetFeature")
- q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
- q.Set("fmisid", fmt.Sprintf("%d", stationID))
- q.Set("starttime", start.Format(time.RFC3339))
- q.Set("endtime", end.Format(time.RFC3339))
- q.Set("timestep", fmt.Sprintf("%d", timestep))
-
- return endpoint + "?" + q.Encode()
-}
-
-func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) {
- dec := xml.NewDecoder(bytes.NewReader(data))
-
- var (
- param string
- ts time.Time
- maxObserved = minTime
- out []Observation
- inTVP bool
- inTime bool
- inValue bool
- )
-
- for {
- tok, err := dec.Token()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, maxObserved, err
- }
-
- switch el := tok.(type) {
- case xml.StartElement:
- switch el.Name.Local {
- case "MeasurementTimeseries":
- for _, a := range el.Attr {
- if a.Name.Local == "id" {
- parts := strings.Split(a.Value, "-")
- if len(parts) > 0 {
- param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
- }
- }
- }
- case "MeasurementTVP":
- inTVP = true
- ts = time.Time{}
- case "time":
- if inTVP {
- inTime = true
- }
- case "value":
- if inTVP {
- inValue = true
- }
- }
-
- case xml.CharData:
- if inTime && inTVP {
- s := strings.TrimSpace(string(el))
- if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
- ts = parsedTime
- }
- } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) {
- s := strings.TrimSpace(string(el))
- var valuePtr *float64
-
- if !strings.EqualFold(s, "NaN") && s != "" {
- if value, err := strconv.ParseFloat(s, 64); err == nil {
- valuePtr = &value
- }
- }
-
- out = append(out, Observation{
- Station: stationID,
- Parameter: param,
- Time: ts,
- Value: valuePtr,
- })
-
- if ts.After(maxObserved) {
- maxObserved = ts
- }
- }
-
- case xml.EndElement:
- switch el.Name.Local {
- case "MeasurementTVP":
- inTVP = false
- case "time":
- inTime = false
- case "value":
- inValue = false
- }
- }
- }
-
- return out, maxObserved, nil
-}
-
-func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
- q := url.Values{}
- q.Set("service", "WFS")
- q.Set("version", "2.0.0")
- q.Set("request", "GetFeature")
- q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
- q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon))
- q.Set("starttime", start.Format(time.RFC3339))
- q.Set("endtime", end.Format(time.RFC3339))
-
- return endpoint + "?" + q.Encode()
-}
-
-func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) {
- dec := xml.NewDecoder(bytes.NewReader(data))
-
- var (
- param string
- runTime time.Time
- fcTime time.Time
- out []ForecastValue
- inTVP bool
- inTime bool
- inValue bool
- )
-
- for {
- tok, err := dec.Token()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, runTime, err
- }
-
- switch el := tok.(type) {
- case xml.StartElement:
- switch el.Name.Local {
- case "resultTime":
- var s string
- if err := dec.DecodeElement(&s, &el); err == nil {
- if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil {
- runTime = parsedTime
- }
- }
- case "MeasurementTimeseries":
- for _, a := range el.Attr {
- if a.Name.Local == "id" {
- parts := strings.Split(a.Value, "-")
- if len(parts) > 0 {
- param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
- }
- }
- }
- case "MeasurementTVP":
- inTVP = true
- fcTime = time.Time{}
- case "time":
- if inTVP {
- inTime = true
- }
- case "value":
- if inTVP {
- inValue = true
- }
- }
-
- case xml.CharData:
- if inTime && inTVP {
- s := strings.TrimSpace(string(el))
- if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
- fcTime = parsedTime
- }
- } else if inValue && inTVP && !fcTime.IsZero() {
- s := strings.TrimSpace(string(el))
- var valuePtr *float64
-
- if !strings.EqualFold(s, "NaN") && s != "" {
- if value, err := strconv.ParseFloat(s, 64); err == nil {
- valuePtr = &value
- }
- }
-
- fc := ForecastValue{
- Location: struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- }{lat, lon},
- Model: "harmonie",
- RunTime: runTime,
- ForecastTime: fcTime,
- Parameter: param,
- Value: valuePtr,
- }
- out = append(out, fc)
- }
-
- case xml.EndElement:
- switch el.Name.Local {
- case "MeasurementTVP":
- inTVP = false
- case "time":
- inTime = false
- case "value":
- inValue = false
- }
- }
- }
-
- return out, runTime, nil
-}
-
-func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) {
- serverURL, err := url.Parse(cfg.MQTTBroker)
- if err != nil {
- return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
- }
-
- // Client configuration
- cliCfg := autopaho.ClientConfig{
- ServerUrls: []*url.URL{serverURL},
- KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()),
- ClientConfig: paho.ClientConfig{
- ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()),
- OnPublishReceived: []func(paho.PublishReceived) (bool, error){
- func(pr paho.PublishReceived) (bool, error) {
- logger.Debug("Received message", "topic", pr.Packet.Topic)
- return true, nil
- },
- },
- // Set OnDisconnect here
- OnServerDisconnect: func(d *paho.Disconnect) {
- if d != nil {
- logger.Warn("MQTT disconnected", "reason", d.ReasonCode)
- } else {
- logger.Info("MQTT disconnected gracefully")
- }
- },
- },
- ConnectUsername: cfg.MQTTUsername,
- ConnectPassword: []byte(cfg.MQTTPassword),
- ConnectTimeout: cfg.HTTPTimeout,
- }
-
- // MQTT v5 Connect properties - these need to be pointers
- cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds())
- cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())
- cliCfg.CleanStartOnInitialConnection = false
-
- // Last Will and Testament (LWT)
- cliCfg.WillMessage = &paho.WillMessage{
- Topic: "weather/clients/status",
- Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)),
- QoS: 1,
- Retain: true,
- }
-
- // TLS configuration (if using TLS)
- if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
- cliCfg.TlsCfg = &tls.Config{
- MinVersion: tls.VersionTLS12,
- }
- }
-
- // Callbacks
- cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
- logger.Info("MQTT v5 connected",
- "broker", serverURL.Host,
- "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil,
- "keep_alive", cfg.MQTTKeepAlive)
- }
-
- cliCfg.OnConnectError = func(err error) {
- logger.Error("MQTT connection error", "error", err)
- }
-
- // Connect to broker
- cm, err := autopaho.NewConnection(ctx, cliCfg)
- if err != nil {
- return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
- }
-
- // Wait for connection to be established
- err = cm.AwaitConnection(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
- }
-
- return cm, nil
-}
-
-func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error {
- topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
-
- // Safe JSON structure
- type SafeObservation struct {
- Station int `json:"station"`
- Parameter string `json:"parameter"`
- Time time.Time `json:"time"`
- Value *JSONFloat64 `json:"value,omitempty"`
- }
-
- var safeValue *JSONFloat64
- if obs.Value != nil {
- v := JSONFloat64(*obs.Value)
- safeValue = &v
- }
-
- safeObs := SafeObservation{
- Station: obs.Station,
- Parameter: obs.Parameter,
- Time: obs.Time,
- Value: safeValue,
- }
-
- b, err := json.Marshal(safeObs)
- if err != nil {
- return err
- }
-
- // MQTT v5 publish properties
- props := &paho.PublishProperties{
- User: []paho.UserProperty{
- {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)},
- {Key: "parameter", Value: obs.Parameter},
- {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)},
- {Key: "data_type", Value: "observation"},
- {Key: "source", Value: "fmi"},
- },
- }
-
- pb := &paho.Publish{
- Topic: topic,
- QoS: 1,
- Retain: false,
- Payload: b,
- Properties: props,
- }
-
- // Publish with context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- _, err = cm.Publish(ctx, pb)
- if err != nil {
- return fmt.Errorf("failed to publish observation: %w", err)
- }
-
- logger.Debug("Published observation",
- "topic", topic,
- "station", obs.Station,
- "parameter", obs.Parameter,
- "value", obs.Value,
- "time", obs.Time)
-
- return nil
-}
-
-func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error {
- topic := fmt.Sprintf(
- "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
- fc.RunTime.Format(time.RFC3339),
- fc.Parameter,
- )
-
- // Safe JSON structure
- type SafeForecastValue struct {
- Location struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- } `json:"location"`
- Model string `json:"model"`
- RunTime time.Time `json:"run_time"`
- ForecastTime time.Time `json:"forecast_time"`
- Parameter string `json:"parameter"`
- Value *JSONFloat64 `json:"value,omitempty"`
- }
-
- var safeValue *JSONFloat64
- if fc.Value != nil {
- v := JSONFloat64(*fc.Value)
- safeValue = &v
- }
-
- safeFc := SafeForecastValue{
- Location: fc.Location,
- Model: fc.Model,
- RunTime: fc.RunTime,
- ForecastTime: fc.ForecastTime,
- Parameter: fc.Parameter,
- Value: safeValue,
- }
-
- b, err := json.Marshal(safeFc)
- if err != nil {
- return err
- }
-
- // MQTT v5 publish properties
- props := &paho.PublishProperties{
- User: []paho.UserProperty{
- {Key: "model", Value: fc.Model},
- {Key: "parameter", Value: fc.Parameter},
- {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)},
- {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)},
- {Key: "data_type", Value: "forecast"},
- {Key: "source", Value: "fmi"},
- {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)},
- {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)},
- },
- }
-
- pb := &paho.Publish{
- Topic: topic,
- QoS: 1,
- Retain: true, // Forecasts are retained since they don't change
- Payload: b,
- Properties: props,
- }
-
- // Publish with context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- _, err = cm.Publish(ctx, pb)
- if err != nil {
- return fmt.Errorf("failed to publish forecast: %w", err)
- }
-
- logger.Debug("Published forecast",
- "topic", topic,
- "parameter", fc.Parameter,
- "run_time", fc.RunTime,
- "forecast_time", fc.ForecastTime)
-
- return nil
-}
-
-func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
- // Load initial state
- lastObs := loadWatermark(cfg.WatermarkFile)
- lastFcRun := time.Time{}
-
- // Create MQTT client
- mqttClient, err := createMQTTClient(ctx, cfg, logger)
- if err != nil {
- return fmt.Errorf("failed to connect to MQTT: %w", err)
- }
- defer func() {
- // Graceful disconnect
- discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- if err := mqttClient.Disconnect(discCtx); err != nil {
- logger.Error("Failed to disconnect MQTT", "error", err)
- }
- }()
-
- // Create timers
- obsTimer := time.NewTimer(0) // Start immediately
- fcTimer := time.NewTimer(0) // Start immediately
-
- defer obsTimer.Stop()
- defer fcTimer.Stop()
-
- logger.Info("Poller started",
- "obs_interval", cfg.ObsPollEvery,
- "fc_interval", cfg.FcPollEvery,
- "mqtt_broker", cfg.MQTTBroker)
-
- for {
- select {
- case <-ctx.Done():
- logger.Info("Poller stopping")
- return nil
-
- case <-obsTimer.C:
- // Poll observations
- obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag)
- if obsEnd.After(lastObs) {
- logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
-
- url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
- data, err := fetchData(url, cfg.HTTPTimeout)
- if err != nil {
- logger.Error("Failed to fetch observations", "error", err)
- } else {
- obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger)
-
- if err != nil {
- logger.Error("Failed to parse observations", "error", err)
- } else if len(obs) > 0 {
- // Publish observations
- successCount := 0
- for _, o := range obs {
- if err := publishObservation(mqttClient, o, logger); err != nil {
- logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter)
- } else {
- successCount++
- }
- }
-
- // Update watermark
- lastObs = maxT
- if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil {
- logger.Error("Failed to store watermark", "error", err)
- }
-
- logger.Info("Published observations",
- "successful", successCount,
- "total", len(obs),
- "new_watermark", lastObs)
- } else {
- logger.Debug("No new observations found")
- }
- }
- }
- obsTimer.Reset(cfg.ObsPollEvery)
-
- case <-fcTimer.C:
- // Poll forecast
- now := time.Now().UTC()
- start := now
- end := now.Add(48 * time.Hour)
-
- logger.Info("Polling forecast", "from", start, "to", end)
-
- url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
- data, err := fetchData(url, cfg.HTTPTimeout)
- if err != nil {
- logger.Error("Failed to fetch forecast", "error", err)
- } else {
- fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger)
-
- if err != nil {
- logger.Error("Failed to parse forecast", "error", err)
- } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
- // Publish forecast
- successCount := 0
- for _, f := range fc {
- if err := publishForecast(mqttClient, f, logger); err != nil {
- logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter)
- } else {
- successCount++
- }
- }
-
- lastFcRun = runTime
- logger.Info("Published forecast",
- "successful", successCount,
- "total", len(fc),
- "run_time", runTime)
- } else {
- logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun)
- }
- }
- fcTimer.Reset(cfg.FcPollEvery)
- }
- }
-}
-
-func main() {
- // Load configuration
- cfg := loadConfig()
-
- // Setup logging
- logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
- Level: parseLogLevel(cfg.LogLevel),
- }))
-
- // Create context for graceful shutdown
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Setup signal handling for graceful shutdown
- sigChan := make(chan os.Signal, 1)
- // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
-
- go func() {
- // For now, we don't handle signals in this simplified version
- // In production, you'd want to handle SIGINT and SIGTERM
- <-sigChan
- logger.Info("Received shutdown signal")
- cancel()
- }()
-
- // Run the poller
- logger.Info("Starting weather data poller with MQTT v5")
- if err := runPoller(ctx, cfg, logger); err != nil {
- logger.Error("Poller failed", "error", err)
- os.Exit(1)
- }
-
- logger.Info("Poller stopped gracefully")
-}
diff --git a/pub.service b/pub.service
index 34eb743..9cfa51f 100644
--- a/pub.service
+++ b/pub.service
@@ -1,4 +1,4 @@
-# /etc/systemd/system/atom-publisher.service
+# /etc/systemd/system/pub.service
[Unit]
Description=Data Publication HUB
After=network.target
@@ -10,7 +10,7 @@ Type=simple
Restart=always
RestartSec=10
User=publisher
-ExecStart=/usr/local/bin/hub
+ExecStart=/usr/local/bin/pub
MemoryHigh=20M
MemoryMax=30M