package main import ( "bytes" "context" "crypto/tls" "encoding/json" "encoding/xml" "fmt" "io" "log/slog" "math" "net/http" "net/url" "os" "strconv" "strings" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/joho/godotenv" ) type Config struct { // FMI API FMIEndpoint string `env:"FMI_ENDPOINT"` // Observation StationID int `env:"STATION_ID"` ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` ObsTimestep int `env:"OBS_TIMESTEP"` WatermarkFile string `env:"WATERMARK_FILE"` // Forecast FcPollEvery time.Duration `env:"FC_POLL_EVERY"` HelLat float64 `env:"HEL_LAT"` HelLon float64 `env:"HEL_LON"` // MQTT MQTTBroker string `env:"MQTT_BROKER"` MQTTClientID string `env:"MQTT_CLIENT_ID"` MQTTUsername string `env:"MQTT_USERNAME"` MQTTPassword string `env:"MQTT_PASSWORD"` MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` // Application LogLevel string `env:"LOG_LEVEL"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` } type Observation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value *float64 `json:"value,omitempty"` } type ForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value *float64 `json:"value,omitempty"` } type JSONFloat64 float64 func (f JSONFloat64) MarshalJSON() ([]byte, error) { if math.IsNaN(float64(f)) { return []byte("null"), nil } return json.Marshal(float64(f)) } func loadConfig() *Config { // Load .env file if it exists _ = godotenv.Load() cfg := &Config{ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), StationID: parseInt(getEnv("STATION_ID", "100968")), ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), MQTTUsername: getEnv("MQTT_USERNAME", ""), MQTTPassword: getEnv("MQTT_PASSWORD", ""), MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")), MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")), LogLevel: getEnv("LOG_LEVEL", "info"), HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), } return cfg } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue } func parseDuration(s string) time.Duration { d, err := time.ParseDuration(s) if err != nil { return 5 * time.Minute // Default fallback } return d } func parseInt(s string) int { i, err := strconv.Atoi(s) if err != nil { return 0 } return i } func parseFloat(s string) float64 { f, err := strconv.ParseFloat(s, 64) if err != nil { return 0 } return f } func parseLogLevel(level string) slog.Level { switch strings.ToLower(level) { case "debug": return slog.LevelDebug case "warn", "warning": return slog.LevelWarn case "error": return slog.LevelError default: return slog.LevelInfo } } func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } return time.Unix(sec, 0).UTC() } func storeWatermark(filename string, t time.Time) error { data := []byte(fmt.Sprintf("%d", t.Unix())) return os.WriteFile(filename, data, 0644) } func fetchData(url string, timeout time.Duration) ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/xml") resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) } return io.ReadAll(resp.Body) } func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") q.Set("fmisid", fmt.Sprintf("%d", stationID)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) q.Set("timestep", fmt.Sprintf("%d", timestep)) return endpoint + "?" + q.Encode() } func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { dec := xml.NewDecoder(bytes.NewReader(data)) var ( param string ts time.Time maxObserved = minTime out []Observation inTVP bool inTime bool inValue bool ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, maxObserved, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { parts := strings.Split(a.Value, "-") if len(parts) > 0 { param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) } } } case "MeasurementTVP": inTVP = true ts = time.Time{} case "time": if inTVP { inTime = true } case "value": if inTVP { inValue = true } } case xml.CharData: if inTime && inTVP { s := strings.TrimSpace(string(el)) if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { ts = parsedTime } } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { s := strings.TrimSpace(string(el)) var valuePtr *float64 if !strings.EqualFold(s, "NaN") && s != "" { if value, err := strconv.ParseFloat(s, 64); err == nil { valuePtr = &value } } out = append(out, Observation{ Station: stationID, Parameter: param, Time: ts, Value: valuePtr, }) if ts.After(maxObserved) { maxObserved = ts } } case xml.EndElement: switch el.Name.Local { case "MeasurementTVP": inTVP = false case "time": inTime = false case "value": inValue = false } } } return out, maxObserved, nil } func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) return endpoint + "?" + q.Encode() } func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { dec := xml.NewDecoder(bytes.NewReader(data)) var ( param string runTime time.Time fcTime time.Time out []ForecastValue inTVP bool inTime bool inValue bool ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, runTime, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "resultTime": var s string if err := dec.DecodeElement(&s, &el); err == nil { if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { runTime = parsedTime } } case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { parts := strings.Split(a.Value, "-") if len(parts) > 0 { param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) } } } case "MeasurementTVP": inTVP = true fcTime = time.Time{} case "time": if inTVP { inTime = true } case "value": if inTVP { inValue = true } } case xml.CharData: if inTime && inTVP { s := strings.TrimSpace(string(el)) if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { fcTime = parsedTime } } else if inValue && inTVP && !fcTime.IsZero() { s := strings.TrimSpace(string(el)) var valuePtr *float64 if !strings.EqualFold(s, "NaN") && s != "" { if value, err := strconv.ParseFloat(s, 64); err == nil { valuePtr = &value } } fc := ForecastValue{ Location: struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{lat, lon}, Model: "harmonie", RunTime: runTime, ForecastTime: fcTime, Parameter: param, Value: valuePtr, } out = append(out, fc) } case xml.EndElement: switch el.Name.Local { case "MeasurementTVP": inTVP = false case "time": inTime = false case "value": inValue = false } } } return out, runTime, nil } func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) { serverURL, err := url.Parse(cfg.MQTTBroker) if err != nil { return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) } // Client configuration cliCfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), ClientConfig: paho.ClientConfig{ ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), OnPublishReceived: []func(paho.PublishReceived) (bool, error){ func(pr paho.PublishReceived) (bool, error) { logger.Debug("Received message", "topic", pr.Packet.Topic) return true, nil }, }, // Set OnDisconnect here OnServerDisconnect: func(d *paho.Disconnect) { if d != nil { logger.Warn("MQTT disconnected", "reason", d.ReasonCode) } else { logger.Info("MQTT disconnected gracefully") } }, }, ConnectUsername: cfg.MQTTUsername, ConnectPassword: []byte(cfg.MQTTPassword), ConnectTimeout: cfg.HTTPTimeout, } // MQTT v5 Connect properties - these need to be pointers cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds()) cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()) cliCfg.CleanStartOnInitialConnection = false // Last Will and Testament (LWT) cliCfg.WillMessage = &paho.WillMessage{ Topic: "weather/clients/status", Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)), QoS: 1, Retain: true, } // TLS configuration (if using TLS) if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { cliCfg.TlsCfg = &tls.Config{ MinVersion: tls.VersionTLS12, } } // Callbacks cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { logger.Info("MQTT v5 connected", "broker", serverURL.Host, "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil, "keep_alive", cfg.MQTTKeepAlive) } cliCfg.OnConnectError = func(err error) { logger.Error("MQTT connection error", "error", err) } // Connect to broker cm, err := autopaho.NewConnection(ctx, cliCfg) if err != nil { return nil, fmt.Errorf("failed to create MQTT connection: %w", err) } // Wait for connection to be established err = cm.AwaitConnection(ctx) if err != nil { return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) } return cm, nil } func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error { topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) // Safe JSON structure type SafeObservation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value *JSONFloat64 `json:"value,omitempty"` } var safeValue *JSONFloat64 if obs.Value != nil { v := JSONFloat64(*obs.Value) safeValue = &v } safeObs := SafeObservation{ Station: obs.Station, Parameter: obs.Parameter, Time: obs.Time, Value: safeValue, } b, err := json.Marshal(safeObs) if err != nil { return err } // MQTT v5 publish properties props := &paho.PublishProperties{ User: []paho.UserProperty{ {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, {Key: "parameter", Value: obs.Parameter}, {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, {Key: "data_type", Value: "observation"}, {Key: "source", Value: "fmi"}, }, } pb := &paho.Publish{ Topic: topic, QoS: 1, Retain: false, Payload: b, Properties: props, } // Publish with context timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = cm.Publish(ctx, pb) if err != nil { return fmt.Errorf("failed to publish observation: %w", err) } logger.Debug("Published observation", "topic", topic, "station", obs.Station, "parameter", obs.Parameter, "time", obs.Time) return nil } func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error { topic := fmt.Sprintf( "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", fc.RunTime.Format(time.RFC3339), fc.Parameter, ) // Safe JSON structure type SafeForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value *JSONFloat64 `json:"value,omitempty"` } var safeValue *JSONFloat64 if fc.Value != nil { v := JSONFloat64(*fc.Value) safeValue = &v } safeFc := SafeForecastValue{ Location: fc.Location, Model: fc.Model, RunTime: fc.RunTime, ForecastTime: fc.ForecastTime, Parameter: fc.Parameter, Value: safeValue, } b, err := json.Marshal(safeFc) if err != nil { return err } // MQTT v5 publish properties props := &paho.PublishProperties{ User: []paho.UserProperty{ {Key: "model", Value: fc.Model}, {Key: "parameter", Value: fc.Parameter}, {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, {Key: "data_type", Value: "forecast"}, {Key: "source", Value: "fmi"}, {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, }, } pb := &paho.Publish{ Topic: topic, QoS: 1, Retain: true, // Forecasts are retained since they don't change Payload: b, Properties: props, } // Publish with context timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = cm.Publish(ctx, pb) if err != nil { return fmt.Errorf("failed to publish forecast: %w", err) } logger.Debug("Published forecast", "topic", topic, "parameter", fc.Parameter, "run_time", fc.RunTime, "forecast_time", fc.ForecastTime) return nil } func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { // Load initial state lastObs := loadWatermark(cfg.WatermarkFile) lastFcRun := time.Time{} // Create MQTT client mqttClient, err := createMQTTClient(ctx, cfg, logger) if err != nil { return fmt.Errorf("failed to connect to MQTT: %w", err) } defer func() { // Graceful disconnect discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := mqttClient.Disconnect(discCtx); err != nil { logger.Error("Failed to disconnect MQTT", "error", err) } }() // Create timers obsTimer := time.NewTimer(0) // Start immediately fcTimer := time.NewTimer(0) // Start immediately defer obsTimer.Stop() defer fcTimer.Stop() logger.Info("Poller started", "obs_interval", cfg.ObsPollEvery, "fc_interval", cfg.FcPollEvery, "mqtt_broker", cfg.MQTTBroker) for { select { case <-ctx.Done(): logger.Info("Poller stopping") return nil case <-obsTimer.C: // Poll observations obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) if obsEnd.After(lastObs) { logger.Info("Polling observations", "from", lastObs, "to", obsEnd) url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) data, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch observations", "error", err) } else { obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger) if err != nil { logger.Error("Failed to parse observations", "error", err) } else if len(obs) > 0 { // Publish observations successCount := 0 for _, o := range obs { if err := publishObservation(mqttClient, o, logger); err != nil { logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) } else { successCount++ } } // Update watermark lastObs = maxT if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { logger.Error("Failed to store watermark", "error", err) } logger.Info("Published observations", "successful", successCount, "total", len(obs), "new_watermark", lastObs) } else { logger.Debug("No new observations found") } } } obsTimer.Reset(cfg.ObsPollEvery) case <-fcTimer.C: // Poll forecast now := time.Now().UTC() start := now end := now.Add(48 * time.Hour) logger.Info("Polling forecast", "from", start, "to", end) url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) data, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch forecast", "error", err) } else { fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger) if err != nil { logger.Error("Failed to parse forecast", "error", err) } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { // Publish forecast successCount := 0 for _, f := range fc { if err := publishForecast(mqttClient, f, logger); err != nil { logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) } else { successCount++ } } lastFcRun = runTime logger.Info("Published forecast", "successful", successCount, "total", len(fc), "run_time", runTime) } else { logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) } } fcTimer.Reset(cfg.FcPollEvery) } } } func main() { // Load configuration cfg := loadConfig() // Setup logging logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: parseLogLevel(cfg.LogLevel), })) // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Setup signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { // For now, we don't handle signals in this simplified version // In production, you'd want to handle SIGINT and SIGTERM <-sigChan logger.Info("Received shutdown signal") cancel() }() // Run the poller logger.Info("Starting weather data poller with MQTT v5") if err := runPoller(ctx, cfg, logger); err != nil { logger.Error("Poller failed", "error", err) os.Exit(1) } logger.Info("Poller stopped gracefully") }