package main import ( "context" "encoding/json" "encoding/xml" "fmt" "io" "log/slog" "math" "net/http" "net/url" "os" "strconv" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" ) /* ============================================================ Configuration ============================================================ */ type Config struct { // FMI API FMIEndpoint string `env:"FMI_ENDPOINT"` // Observation StationID int `env:"STATION_ID"` ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` ObsTimestep int `env:"OBS_TIMESTEP"` WatermarkFile string `env:"WATERMARK_FILE"` // Forecast FcPollEvery time.Duration `env:"FC_POLL_EVERY"` HelLat float64 `env:"HEL_LAT"` HelLon float64 `env:"HEL_LON"` // MQTT MQTTBroker string `env:"MQTT_BROKER"` MQTTClientID string `env:"MQTT_CLIENT_ID"` MQTTUsername string `env:"MQTT_USERNAME"` MQTTPassword string `env:"MQTT_PASSWORD"` // Application LogLevel string `env:"LOG_LEVEL"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` } /* ============================================================ Domain models ============================================================ */ type Observation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value *float64 `json:"value,omitempty"` } type ForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value *float64 `json:"value,omitempty"` } /* ============================================================ JSON Float for NaN handling ============================================================ */ type JSONFloat64 float64 func (f JSONFloat64) MarshalJSON() ([]byte, error) { if math.IsNaN(float64(f)) { return []byte("null"), nil } return json.Marshal(float64(f)) } /* ============================================================ Load Configuration ============================================================ */ func loadConfig() *Config { // Load .env file if it exists _ = godotenv.Load() cfg := &Config{ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), StationID: parseInt(getEnv("STATION_ID", "100968")), ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), MQTTUsername: getEnv("MQTT_USERNAME", ""), MQTTPassword: getEnv("MQTT_PASSWORD", ""), LogLevel: getEnv("LOG_LEVEL", "info"), HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), } return cfg } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue } func parseDuration(s string) time.Duration { d, err := time.ParseDuration(s) if err != nil { return 5 * time.Minute // Default fallback } return d } func parseInt(s string) int { i, err := strconv.Atoi(s) if err != nil { return 0 } return i } func parseFloat(s string) float64 { f, err := strconv.ParseFloat(s, 64) if err != nil { return 0 } return f } func parseLogLevel(level string) slog.Level { switch strings.ToLower(level) { case "debug": return slog.LevelDebug case "warn", "warning": return slog.LevelWarn case "error": return slog.LevelError default: return slog.LevelInfo } } /* ============================================================ Watermark persistence ============================================================ */ func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } return time.Unix(sec, 0).UTC() } func storeWatermark(filename string, t time.Time) error { data := []byte(fmt.Sprintf("%d", t.Unix())) return os.WriteFile(filename, data, 0644) } /* ============================================================ HTTP Fetch ============================================================ */ func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/xml") resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) } return resp.Body, nil } /* ============================================================ Observation Polling ============================================================ */ func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") q.Set("fmisid", fmt.Sprintf("%d", stationID)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) q.Set("timestep", fmt.Sprintf("%d", timestep)) return endpoint + "?" + q.Encode() } func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { dec := xml.NewDecoder(r) var ( param string ts time.Time maxObserved = minTime out []Observation inTVP bool inTime bool inValue bool ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, maxObserved, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { parts := strings.Split(a.Value, "-") if len(parts) > 0 { param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) } } } case "MeasurementTVP": inTVP = true ts = time.Time{} case "time": if inTVP { inTime = true } case "value": if inTVP { inValue = true } } case xml.CharData: if inTime && inTVP { s := strings.TrimSpace(string(el)) if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { ts = parsedTime } } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { s := strings.TrimSpace(string(el)) var valuePtr *float64 if !strings.EqualFold(s, "NaN") && s != "" { if value, err := strconv.ParseFloat(s, 64); err == nil { valuePtr = &value } } out = append(out, Observation{ Station: stationID, Parameter: param, Time: ts, Value: valuePtr, }) if ts.After(maxObserved) { maxObserved = ts } } case xml.EndElement: switch el.Name.Local { case "MeasurementTVP": inTVP = false case "time": inTime = false case "value": inValue = false } } } return out, maxObserved, nil } /* ============================================================ Forecast Polling ============================================================ */ func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) return endpoint + "?" + q.Encode() } func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { dec := xml.NewDecoder(r) var ( param string runTime time.Time fcTime time.Time out []ForecastValue inTVP bool inTime bool inValue bool ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, runTime, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "resultTime": var s string if err := dec.DecodeElement(&s, &el); err == nil { if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { runTime = parsedTime } } case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { parts := strings.Split(a.Value, "-") if len(parts) > 0 { param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) } } } case "MeasurementTVP": inTVP = true fcTime = time.Time{} case "time": if inTVP { inTime = true } case "value": if inTVP { inValue = true } } case xml.CharData: if inTime && inTVP { s := strings.TrimSpace(string(el)) if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { fcTime = parsedTime } } else if inValue && inTVP && !fcTime.IsZero() { s := strings.TrimSpace(string(el)) var valuePtr *float64 if !strings.EqualFold(s, "NaN") && s != "" { if value, err := strconv.ParseFloat(s, 64); err == nil { valuePtr = &value } } fc := ForecastValue{ Location: struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{lat, lon}, Model: "harmonie", RunTime: runTime, ForecastTime: fcTime, Parameter: param, Value: valuePtr, } out = append(out, fc) } case xml.EndElement: switch el.Name.Local { case "MeasurementTVP": inTVP = false case "time": inTime = false case "value": inValue = false } } } return out, runTime, nil } /* ============================================================ MQTT Publishing ============================================================ */ func createMQTTClient(cfg *Config) (mqtt.Client, error) { opts := mqtt.NewClientOptions(). AddBroker(cfg.MQTTBroker). SetClientID(fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())). SetCleanSession(true) if cfg.MQTTUsername != "" { opts.SetUsername(cfg.MQTTUsername) if cfg.MQTTPassword != "" { opts.SetPassword(cfg.MQTTPassword) } } client := mqtt.NewClient(opts) token := client.Connect() if !token.WaitTimeout(cfg.HTTPTimeout) { return nil, fmt.Errorf("MQTT connection timeout") } if err := token.Error(); err != nil { return nil, err } return client, nil } func publishObservation(client mqtt.Client, obs Observation, timeout time.Duration) error { topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) // Safe JSON structure type SafeObservation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value *JSONFloat64 `json:"value,omitempty"` } var safeValue *JSONFloat64 if obs.Value != nil { v := JSONFloat64(*obs.Value) safeValue = &v } safeObs := SafeObservation{ Station: obs.Station, Parameter: obs.Parameter, Time: obs.Time, Value: safeValue, } b, err := json.Marshal(safeObs) if err != nil { return err } token := client.Publish(topic, 1, false, b) if !token.WaitTimeout(timeout) { return fmt.Errorf("publish timeout") } return token.Error() } func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration) error { topic := fmt.Sprintf( "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", fc.RunTime.Format(time.RFC3339), fc.Parameter, ) // Safe JSON structure type SafeForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value *JSONFloat64 `json:"value,omitempty"` } var safeValue *JSONFloat64 if fc.Value != nil { v := JSONFloat64(*fc.Value) safeValue = &v } safeFc := SafeForecastValue{ Location: fc.Location, Model: fc.Model, RunTime: fc.RunTime, ForecastTime: fc.ForecastTime, Parameter: fc.Parameter, Value: safeValue, } b, err := json.Marshal(safeFc) if err != nil { return err } token := client.Publish(topic, 1, true, b) if !token.WaitTimeout(timeout) { return fmt.Errorf("publish timeout") } return token.Error() } /* ============================================================ Single Poller ============================================================ */ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { // Load initial state lastObs := loadWatermark(cfg.WatermarkFile) lastFcRun := time.Time{} // Create timers obsTimer := time.NewTimer(0) // Start immediately fcTimer := time.NewTimer(0) // Start immediately defer obsTimer.Stop() defer fcTimer.Stop() // Create MQTT client mqttClient, err := createMQTTClient(cfg) if err != nil { return fmt.Errorf("failed to connect to MQTT: %w", err) } defer mqttClient.Disconnect(250) logger.Info("Poller started", "obs_interval", cfg.ObsPollEvery, "fc_interval", cfg.FcPollEvery) for { select { case <-ctx.Done(): logger.Info("Poller stopping") return nil case <-obsTimer.C: // Poll observations obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) if obsEnd.After(lastObs) { logger.Info("Polling observations", "from", lastObs, "to", obsEnd) url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) body, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch observations", "error", err) } else { obs, maxT, err := parseObservations(body, cfg.StationID, lastObs, logger) body.Close() if err != nil { logger.Error("Failed to parse observations", "error", err) } else if len(obs) > 0 { // Publish observations for _, o := range obs { if err := publishObservation(mqttClient, o, cfg.HTTPTimeout); err != nil { logger.Error("Failed to publish observation", "error", err) } } // Update watermark lastObs = maxT if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { logger.Error("Failed to store watermark", "error", err) } logger.Info("Published observations", "count", len(obs), "new_watermark", lastObs) } } } obsTimer.Reset(cfg.ObsPollEvery) case <-fcTimer.C: // Poll forecast now := time.Now().UTC() start := now end := now.Add(48 * time.Hour) logger.Info("Polling forecast", "from", start, "to", end) url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) body, err := fetchData(url, cfg.HTTPTimeout) if err != nil { logger.Error("Failed to fetch forecast", "error", err) } else { fc, runTime, err := parseForecast(body, cfg.HelLat, cfg.HelLon, logger) body.Close() if err != nil { logger.Error("Failed to parse forecast", "error", err) } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { // Publish forecast for _, f := range fc { if err := publishForecast(mqttClient, f, cfg.HTTPTimeout); err != nil { logger.Error("Failed to publish forecast", "error", err) } } lastFcRun = runTime logger.Info("Published forecast", "count", len(fc), "run_time", runTime) } } fcTimer.Reset(cfg.FcPollEvery) } } } /* ============================================================ Main ============================================================ */ func main() { // Load configuration cfg := loadConfig() // Setup logging logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: parseLogLevel(cfg.LogLevel), })) // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Run the poller logger.Info("Starting weather data poller") if err := runPoller(ctx, cfg, logger); err != nil { logger.Error("Poller failed", "error", err) os.Exit(1) } logger.Info("Poller stopped gracefully") }