diff options
| author | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-03 18:59:27 +0200 |
|---|---|---|
| committer | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-03 18:59:27 +0200 |
| commit | b1191e28c7d48ba96d07942ff609f618589280d2 (patch) | |
| tree | c02e35d3439d164970db88d679d21697f31f7f9d /main.go | |
| parent | 9b111ad0c55addfef2a47bb9ae350decb28280c0 (diff) | |
| download | pub-b1191e28c7d48ba96d07942ff609f618589280d2.tar.zst | |
Update
Diffstat (limited to '')
| -rw-r--r-- | main.go | 347 |
1 files changed, 230 insertions, 117 deletions
@@ -1,7 +1,9 @@ package main import ( + "bytes" "context" + "crypto/tls" "encoding/json" "encoding/xml" "fmt" @@ -15,14 +17,11 @@ import ( "strings" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" "github.com/joho/godotenv" ) -/* ============================================================ - Configuration - ============================================================ */ - type Config struct { // FMI API FMIEndpoint string `env:"FMI_ENDPOINT"` @@ -40,20 +39,18 @@ type Config struct { HelLon float64 `env:"HEL_LON"` // MQTT - MQTTBroker string `env:"MQTT_BROKER"` - MQTTClientID string `env:"MQTT_CLIENT_ID"` - MQTTUsername string `env:"MQTT_USERNAME"` - MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` // Application LogLevel string `env:"LOG_LEVEL"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` } -/* ============================================================ - Domain models - ============================================================ */ - type Observation struct { Station int `json:"station"` Parameter string `json:"parameter"` @@ -74,10 +71,6 @@ type ForecastValue struct { Value *float64 `json:"value,omitempty"` } -/* ============================================================ - JSON Float for NaN handling - ============================================================ */ - type JSONFloat64 float64 func (f JSONFloat64) MarshalJSON() ([]byte, error) { @@ -87,30 +80,28 @@ func (f JSONFloat64) MarshalJSON() ([]byte, error) { return json.Marshal(float64(f)) } -/* ============================================================ - Load Configuration - ============================================================ */ - func loadConfig() *Config { // Load .env file if it exists _ = godotenv.Load() cfg := &Config{ - FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), - StationID: parseInt(getEnv("STATION_ID", "100968")), - ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), - ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), - ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), - WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), - FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), - HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), - HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), - MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), - MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), - MQTTUsername: getEnv("MQTT_USERNAME", ""), - MQTTPassword: getEnv("MQTT_PASSWORD", ""), - LogLevel: getEnv("LOG_LEVEL", "info"), - HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), + FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: parseInt(getEnv("STATION_ID", "100968")), + ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), + WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), + HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), + HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), + MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: getEnv("MQTT_USERNAME", ""), + MQTTPassword: getEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")), + LogLevel: getEnv("LOG_LEVEL", "info"), + HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), } return cfg @@ -160,10 +151,6 @@ func parseLogLevel(level string) slog.Level { } } -/* ============================================================ - Watermark persistence - ============================================================ */ - func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { @@ -181,11 +168,7 @@ func storeWatermark(filename string, t time.Time) error { return os.WriteFile(filename, data, 0644) } -/* ============================================================ - HTTP Fetch - ============================================================ */ - -func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) { +func fetchData(url string, timeout time.Duration) ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -199,19 +182,15 @@ func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) { if err != nil { return nil, err } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - resp.Body.Close() return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) } - return resp.Body, nil + return io.ReadAll(resp.Body) } -/* ============================================================ - Observation Polling - ============================================================ */ - func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { q := url.Values{} q.Set("service", "WFS") @@ -226,8 +205,8 @@ func buildObservationsURL(endpoint string, stationID int, start, end time.Time, return endpoint + "?" + q.Encode() } -func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { - dec := xml.NewDecoder(r) +func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) var ( param string @@ -316,10 +295,6 @@ func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *sl return out, maxObserved, nil } -/* ============================================================ - Forecast Polling - ============================================================ */ - func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { q := url.Values{} q.Set("service", "WFS") @@ -333,8 +308,8 @@ func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) s return endpoint + "?" + q.Encode() } -func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { - dec := xml.NewDecoder(r) +func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) var ( param string @@ -432,38 +407,86 @@ func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]Foreca return out, runTime, nil } -/* ============================================================ - MQTT Publishing - ============================================================ */ +func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) { + serverURL, err := url.Parse(cfg.MQTTBroker) + if err != nil { + return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + // Client configuration + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), + OnPublishReceived: []func(paho.PublishReceived) (bool, error){ + func(pr paho.PublishReceived) (bool, error) { + logger.Debug("Received message", "topic", pr.Packet.Topic) + return true, nil + }, + }, + // Set OnDisconnect here + OnServerDisconnect: func(d *paho.Disconnect) { + if d != nil { + logger.Warn("MQTT disconnected", "reason", d.ReasonCode) + } else { + logger.Info("MQTT disconnected gracefully") + } + }, + }, + ConnectUsername: cfg.MQTTUsername, + ConnectPassword: []byte(cfg.MQTTPassword), + ConnectTimeout: cfg.HTTPTimeout, + } -func createMQTTClient(cfg *Config) (mqtt.Client, error) { - opts := mqtt.NewClientOptions(). - AddBroker(cfg.MQTTBroker). - SetClientID(fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())). - SetCleanSession(true) + // MQTT v5 Connect properties - these need to be pointers + cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds()) + cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()) + cliCfg.CleanStartOnInitialConnection = false + + // Last Will and Testament (LWT) + cliCfg.WillMessage = &paho.WillMessage{ + Topic: "weather/clients/status", + Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)), + QoS: 1, + Retain: true, + } - if cfg.MQTTUsername != "" { - opts.SetUsername(cfg.MQTTUsername) - if cfg.MQTTPassword != "" { - opts.SetPassword(cfg.MQTTPassword) + // TLS configuration (if using TLS) + if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { + cliCfg.TlsCfg = &tls.Config{ + MinVersion: tls.VersionTLS12, } } - client := mqtt.NewClient(opts) - token := client.Connect() + // Callbacks + cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + logger.Info("MQTT v5 connected", + "broker", serverURL.Host, + "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil, + "keep_alive", cfg.MQTTKeepAlive) + } - if !token.WaitTimeout(cfg.HTTPTimeout) { - return nil, fmt.Errorf("MQTT connection timeout") + cliCfg.OnConnectError = func(err error) { + logger.Error("MQTT connection error", "error", err) } - if err := token.Error(); err != nil { - return nil, err + // Connect to broker + cm, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return nil, fmt.Errorf("failed to create MQTT connection: %w", err) + } + + // Wait for connection to be established + err = cm.AwaitConnection(ctx) + if err != nil { + return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) } - return client, nil + return cm, nil } -func publishObservation(client mqtt.Client, obs Observation, timeout time.Duration) error { +func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error { topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) // Safe JSON structure @@ -492,14 +515,44 @@ func publishObservation(client mqtt.Client, obs Observation, timeout time.Durati return err } - token := client.Publish(topic, 1, false, b) - if !token.WaitTimeout(timeout) { - return fmt.Errorf("publish timeout") + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, + {Key: "parameter", Value: obs.Parameter}, + {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, + {Key: "data_type", Value: "observation"}, + {Key: "source", Value: "fmi"}, + }, + } + + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: false, + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish observation: %w", err) } - return token.Error() + + logger.Debug("Published observation", + "topic", topic, + "station", obs.Station, + "parameter", obs.Parameter, + "time", obs.Time) + + return nil } -func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration) error { +func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error { topic := fmt.Sprintf( "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", fc.RunTime.Format(time.RFC3339), @@ -539,22 +592,66 @@ func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration return err } - token := client.Publish(topic, 1, true, b) - if !token.WaitTimeout(timeout) { - return fmt.Errorf("publish timeout") + // MQTT v5 publish properties + props := &paho.PublishProperties{ + User: []paho.UserProperty{ + {Key: "model", Value: fc.Model}, + {Key: "parameter", Value: fc.Parameter}, + {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, + {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, + {Key: "data_type", Value: "forecast"}, + {Key: "source", Value: "fmi"}, + {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, + {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, + }, } - return token.Error() -} -/* ============================================================ - Single Poller - ============================================================ */ + pb := &paho.Publish{ + Topic: topic, + QoS: 1, + Retain: true, // Forecasts are retained since they don't change + Payload: b, + Properties: props, + } + + // Publish with context timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = cm.Publish(ctx, pb) + if err != nil { + return fmt.Errorf("failed to publish forecast: %w", err) + } + + logger.Debug("Published forecast", + "topic", topic, + "parameter", fc.Parameter, + "run_time", fc.RunTime, + "forecast_time", fc.ForecastTime) + + return nil +} func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { // Load initial state lastObs := loadWatermark(cfg.WatermarkFile) lastFcRun := time.Time{} + // Create MQTT client + mqttClient, err := createMQTTClient(ctx, cfg, logger) + if err != nil { + return fmt.Errorf("failed to connect to MQTT: %w", err) + } + defer func() { + // Graceful disconnect + discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := mqttClient.Disconnect(discCtx); err != nil { + logger.Error("Failed to disconnect MQTT", "error", err) + } + }() + // Create timers obsTimer := time.NewTimer(0) // Start immediately fcTimer := time.NewTimer(0) // Start immediately @@ -562,16 +659,10 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { defer obsTimer.Stop() defer fcTimer.Stop() - // Create MQTT client - mqttClient, err := createMQTTClient(cfg) - if err != nil { - return fmt.Errorf("failed to connect to MQTT: %w", err) - } - defer mqttClient.Disconnect(250) - logger.Info("Poller started", "obs_interval", cfg.ObsPollEvery, - "fc_interval", cfg.FcPollEvery) + "fc_interval", cfg.FcPollEvery, + "mqtt_broker", cfg.MQTTBroker) for { select { @@ -586,20 +677,22 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { logger.Info("Polling observations", "from", lastObs, "to", obsEnd) url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) - body, err := fetchData(url, cfg.HTTPTimeout) + data, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch observations", "error", err) } else { - obs, maxT, err := parseObservations(body, cfg.StationID, lastObs, logger) - body.Close() + obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger) if err != nil { logger.Error("Failed to parse observations", "error", err) } else if len(obs) > 0 { // Publish observations + successCount := 0 for _, o := range obs { - if err := publishObservation(mqttClient, o, cfg.HTTPTimeout); err != nil { - logger.Error("Failed to publish observation", "error", err) + if err := publishObservation(mqttClient, o, logger); err != nil { + logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) + } else { + successCount++ } } @@ -609,7 +702,12 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { logger.Error("Failed to store watermark", "error", err) } - logger.Info("Published observations", "count", len(obs), "new_watermark", lastObs) + logger.Info("Published observations", + "successful", successCount, + "total", len(obs), + "new_watermark", lastObs) + } else { + logger.Debug("No new observations found") } } } @@ -624,25 +722,32 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { logger.Info("Polling forecast", "from", start, "to", end) url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) - body, err := fetchData(url, cfg.HTTPTimeout) + data, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch forecast", "error", err) } else { - fc, runTime, err := parseForecast(body, cfg.HelLat, cfg.HelLon, logger) - body.Close() + fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger) if err != nil { logger.Error("Failed to parse forecast", "error", err) } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { // Publish forecast + successCount := 0 for _, f := range fc { - if err := publishForecast(mqttClient, f, cfg.HTTPTimeout); err != nil { - logger.Error("Failed to publish forecast", "error", err) + if err := publishForecast(mqttClient, f, logger); err != nil { + logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) + } else { + successCount++ } } lastFcRun = runTime - logger.Info("Published forecast", "count", len(fc), "run_time", runTime) + logger.Info("Published forecast", + "successful", successCount, + "total", len(fc), + "run_time", runTime) + } else { + logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) } } fcTimer.Reset(cfg.FcPollEvery) @@ -650,10 +755,6 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { } } -/* ============================================================ - Main - ============================================================ */ - func main() { // Load configuration cfg := loadConfig() @@ -667,8 +768,20 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Setup signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + // For now, we don't handle signals in this simplified version + // In production, you'd want to handle SIGINT and SIGTERM + <-sigChan + logger.Info("Received shutdown signal") + cancel() + }() + // Run the poller - logger.Info("Starting weather data poller") + logger.Info("Starting weather data poller with MQTT v5") if err := runPoller(ctx, cfg, logger); err != nil { logger.Error("Poller failed", "error", err) os.Exit(1) |
