package main import ( "context" "encoding/json" "encoding/xml" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "os/signal" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) /* ============================================================ Configuration (now configurable) ============================================================ */ type Config struct { // FMI API FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"` // Observation StationID int `env:"STATION_ID" default:"100968"` ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"` ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"` ObsTimestep int `env:"OBS_TIMESTEP" default:"60"` WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"` // Forecast FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"` HelLat float64 `env:"HEL_LAT" default:"60.1699"` HelLon float64 `env:"HEL_LON" default:"24.9384"` // MQTT MQTTBroker string `env:"MQTT_BROKER" default:"tcp://tammi.cc:1883"` MQTTClientID string `env:"MQTT_CLIENT_ID" default:"pub"` MQTTUsername string `env:"MQTT_USERNAME"` MQTTPassword string `env:"MQTT_PASSWORD"` // Application LogLevel string `env:"LOG_LEVEL" default:"info"` HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"` MaxRetries int `env:"MAX_RETRIES" default:"3"` RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"` } /* ============================================================ Application State ============================================================ */ type App struct { config *Config logger *slog.Logger mqttClient mqtt.Client httpClient *http.Client // State lastObsWatermark atomic.Value // stores time.Time lastForecastRun atomic.Value // stores time.Time shutdownCtx context.Context shutdownCancel context.CancelFunc // Wait group for graceful shutdown wg sync.WaitGroup } /* ============================================================ Domain models ============================================================ */ type Observation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` Value float64 `json:"value"` } type ForecastValue struct { Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } `json:"location"` Model string `json:"model"` RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` Value float64 `json:"value"` } /* ============================================================ Initialization ============================================================ */ func NewApp() (*App, error) { config := loadConfig() // Setup logging logLevel := parseLogLevel(config.LogLevel) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: logLevel, })) // Create HTTP client with timeout httpClient := &http.Client{ Timeout: config.HTTPTimeout, Transport: &http.Transport{ MaxIdleConns: 10, IdleConnTimeout: 30 * time.Second, DisableCompression: false, }, } // Create MQTT client mqttOpts := mqtt.NewClientOptions(). AddBroker(config.MQTTBroker). SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())). SetCleanSession(true). SetAutoReconnect(true). SetMaxReconnectInterval(10 * time.Second). SetConnectionLostHandler(func(c mqtt.Client, err error) { logger.Error("MQTT connection lost", "error", err) }). SetOnConnectHandler(func(c mqtt.Client) { logger.Info("MQTT connected successfully") }) if config.MQTTUsername != "" { mqttOpts.SetUsername(config.MQTTUsername) if config.MQTTPassword != "" { mqttOpts.SetPassword(config.MQTTPassword) } } mqttClient := mqtt.NewClient(mqttOpts) ctx, cancel := context.WithCancel(context.Background()) app := &App{ config: config, logger: logger, httpClient: httpClient, mqttClient: mqttClient, shutdownCtx: ctx, shutdownCancel: cancel, } // Initialize atomic values app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile)) app.lastForecastRun.Store(time.Time{}) return app, nil } func (app *App) ConnectMQTT() error { token := app.mqttClient.Connect() if !token.WaitTimeout(app.config.HTTPTimeout) { return errors.New("MQTT connection timeout") } if err := token.Error(); err != nil { return fmt.Errorf("MQTT connection failed: %w", err) } app.logger.Info("Connected to MQTT broker") return nil } /* ============================================================ Configuration loading ============================================================ */ func loadConfig() *Config { cfg := &Config{ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), StationID: parseInt(getEnv("STATION_ID", "100968")), ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), MQTTBroker: getEnv("MQTT_BROKER", "tcp://tammi.cc:1883"), MQTTClientID: getEnv("MQTT_CLIENT_ID", "pub"), MQTTUsername: getEnv("MQTT_USERNAME", "host"), MQTTPassword: getEnv("MQTT_PASSWORD", "salasana"), LogLevel: getEnv("LOG_LEVEL", "info"), HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")), RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")), } return cfg } func getEnv(key, defaultValue string) string { if value, exists := os.LookupEnv(key); exists { return value } return defaultValue } func parseDuration(s string) time.Duration { d, err := time.ParseDuration(s) if err != nil { panic(fmt.Sprintf("invalid duration %s: %v", s, err)) } return d } func parseInt(s string) int { i, err := strconv.Atoi(s) if err != nil { panic(fmt.Sprintf("invalid int %s: %v", s, err)) } return i } func parseFloat(s string) float64 { f, err := strconv.ParseFloat(s, 64) if err != nil { panic(fmt.Sprintf("invalid float %s: %v", s, err)) } return f } func parseLogLevel(level string) slog.Level { switch strings.ToLower(level) { case "debug": return slog.LevelDebug case "warn", "warning": return slog.LevelWarn case "error": return slog.LevelError default: return slog.LevelInfo } } /* ============================================================ Watermark persistence ============================================================ */ func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { // If file doesn't exist, start from 24 hours ago return time.Now().UTC().Add(-24 * time.Hour) } sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) if err != nil { return time.Now().UTC().Add(-24 * time.Hour) } return time.Unix(sec, 0).UTC() } func storeWatermark(filename string, t time.Time) error { data := []byte(fmt.Sprintf("%d", t.Unix())) return os.WriteFile(filename, data, 0644) } /* ============================================================ Retry helper ============================================================ */ func withRetry(app *App, name string, maxRetries int, fn func() error) error { var lastErr error for i := 0; i < maxRetries; i++ { if err := fn(); err != nil { lastErr = err app.logger.Warn("Operation failed, retrying", "operation", name, "attempt", i+1, "max_attempts", maxRetries, "error", err) if i < maxRetries-1 { select { case <-time.After(app.config.RetryDelay): continue case <-app.shutdownCtx.Done(): return app.shutdownCtx.Err() } } } else { return nil } } return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr) } /* ============================================================ HTTP helpers with retry ============================================================ */ func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) { var body io.ReadCloser err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error { req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil) if err != nil { return err } req.Header.Set("Accept", "application/xml") req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0") resp, err := app.httpClient.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) } body = resp.Body return nil }) if err != nil { return nil, err } return body, nil } /* ============================================================ Observation polling ============================================================ */ func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep)) url := app.config.FMIEndpoint + "?" + q.Encode() app.logger.Debug("Fetching observations", "url", url) return app.fetchWithRetry(url) } func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) { dec := xml.NewDecoder(r) var ( param string ts time.Time maxObserved = minTime out []Observation parseErr error ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, maxObserved, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { param = normalizeParameter(a.Value) } } case "time": var s string if err := dec.DecodeElement(&s, &el); err != nil { parseErr = fmt.Errorf("failed to decode time: %w", err) continue } ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) if err != nil { parseErr = fmt.Errorf("failed to parse time %s: %w", s, err) continue } case "value": var v float64 if err := dec.DecodeElement(&v, &el); err != nil { parseErr = fmt.Errorf("failed to decode value: %w", err) continue } if ts.After(minTime) { out = append(out, Observation{ Station: -1, // Will be set by caller Parameter: param, Time: ts, Value: v, }) if ts.After(maxObserved) { maxObserved = ts } } } } } if parseErr != nil { app.logger.Warn("Partial parse errors", "error", parseErr) } return out, maxObserved, nil } /* ============================================================ Forecast polling ============================================================ */ func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) url := app.config.FMIEndpoint + "?" + q.Encode() app.logger.Debug("Fetching forecast", "url", url) return app.fetchWithRetry(url) } func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) { dec := xml.NewDecoder(r) var ( param string runTime time.Time fcTime time.Time out []ForecastValue parseErr error ) for { tok, err := dec.Token() if err == io.EOF { break } if err != nil { return nil, runTime, err } switch el := tok.(type) { case xml.StartElement: switch el.Name.Local { case "resultTime": var s string if err := dec.DecodeElement(&s, &el); err != nil { parseErr = fmt.Errorf("failed to decode resultTime: %w", err) continue } runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) if err != nil { parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err) continue } case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { param = normalizeParameter(a.Value) } } case "time": var s string if err := dec.DecodeElement(&s, &el); err != nil { parseErr = fmt.Errorf("failed to decode forecast time: %w", err) continue } fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) if err != nil { parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err) continue } case "value": var v float64 if err := dec.DecodeElement(&v, &el); err != nil { parseErr = fmt.Errorf("failed to decode forecast value: %w", err) continue } out = append(out, ForecastValue{ Model: "harmonie", RunTime: runTime, ForecastTime: fcTime, Parameter: param, Value: v, }) } } } if parseErr != nil { app.logger.Warn("Partial parse errors in forecast", "error", parseErr) } return out, runTime, nil } /* ============================================================ Polling workers ============================================================ */ func (app *App) runObservationPoller() { defer app.wg.Done() app.logger.Info("Starting observation poller", "poll_interval", app.config.ObsPollEvery, "station", app.config.StationID) ticker := time.NewTicker(app.config.ObsPollEvery) defer ticker.Stop() for { select { case <-app.shutdownCtx.Done(): app.logger.Info("Observation poller stopping") return case <-ticker.C: app.pollObservations() } } } func (app *App) pollObservations() { startTime := time.Now() lastObs := app.lastObsWatermark.Load().(time.Time) obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag) if !obsEnd.After(lastObs) { return // Nothing new to fetch } app.logger.Info("Polling observations", "start", lastObs, "end", obsEnd) body, err := app.fetchObservations(lastObs, obsEnd) if err != nil { app.logger.Error("Failed to fetch observations", "error", err) return } defer body.Close() obs, maxT, err := app.parseObservations(body, lastObs) if err != nil { app.logger.Error("Failed to parse observations", "error", err) return } // Set station ID for all observations for i := range obs { obs[i].Station = app.config.StationID } if len(obs) > 0 { if err := app.publishObs(obs); err != nil { app.logger.Error("Failed to publish observations", "error", err) return } app.lastObsWatermark.Store(maxT) if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil { app.logger.Error("Failed to store watermark", "error", err) } app.logger.Info("Published observations", "count", len(obs), "new_watermark", maxT, "duration", time.Since(startTime)) } else { app.logger.Debug("No new observations") } } func (app *App) runForecastPoller() { defer app.wg.Done() app.logger.Info("Starting forecast poller", "poll_interval", app.config.FcPollEvery, "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) ticker := time.NewTicker(app.config.FcPollEvery) defer ticker.Stop() for { select { case <-app.shutdownCtx.Done(): app.logger.Info("Forecast poller stopping") return case <-ticker.C: app.pollForecast() } } } func (app *App) pollForecast() { startTime := time.Now() now := time.Now().UTC() start := now end := now.Add(48 * time.Hour) app.logger.Info("Polling forecast", "start", start, "end", end) body, err := app.fetchForecast(start, end) if err != nil { app.logger.Error("Failed to fetch forecast", "error", err) return } defer body.Close() fc, runTime, err := app.parseForecast(body) if err != nil { app.logger.Error("Failed to parse forecast", "error", err) return } // Set location for all forecasts for i := range fc { fc[i].Location.Lat = app.config.HelLat fc[i].Location.Lon = app.config.HelLon } if len(fc) > 0 { // Check if this is a new forecast run lastRun := app.lastForecastRun.Load().(time.Time) if runTime.After(lastRun) { if err := app.publishForecast(fc); err != nil { app.logger.Error("Failed to publish forecast", "error", err) return } app.lastForecastRun.Store(runTime) app.logger.Info("Published forecast", "count", len(fc), "run_time", runTime, "duration", time.Since(startTime)) } else { app.logger.Debug("Forecast run already published", "run_time", runTime) } } } /* ============================================================ MQTT publishing with error handling ============================================================ */ func (app *App) publishObs(obs []Observation) error { for _, o := range obs { topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter) b, err := json.Marshal(o) if err != nil { return fmt.Errorf("failed to marshal observation: %w", err) } token := app.mqttClient.Publish(topic, 1, false, b) if !token.WaitTimeout(app.config.HTTPTimeout) { return errors.New("MQTT publish timeout") } if err := token.Error(); err != nil { return fmt.Errorf("failed to publish observation: %w", err) } } return nil } func (app *App) publishForecast(fc []ForecastValue) error { for _, v := range fc { topic := fmt.Sprintf( "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", v.RunTime.Format(time.RFC3339), v.Parameter, ) b, err := json.Marshal(v) if err != nil { return fmt.Errorf("failed to marshal forecast: %w", err) } token := app.mqttClient.Publish(topic, 1, true, b) if !token.WaitTimeout(app.config.HTTPTimeout) { return errors.New("MQTT publish timeout") } if err := token.Error(); err != nil { return fmt.Errorf("failed to publish forecast: %w", err) } } return nil } /* ============================================================ Utilities ============================================================ */ func normalizeParameter(id string) string { return strings.TrimPrefix(strings.ToLower(id), "ts_") } /* ============================================================ Main ============================================================ */ func main() { // Create and initialize app app, err := NewApp() if err != nil { panic(fmt.Sprintf("Failed to initialize app: %v", err)) } defer app.shutdownCancel() // Setup signal handling signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) // Connect to MQTT if err := app.ConnectMQTT(); err != nil { app.logger.Error("Failed to connect to MQTT", "error", err) os.Exit(1) } defer app.mqttClient.Disconnect(250) // Start pollers app.wg.Add(2) go app.runObservationPoller() go app.runForecastPoller() app.logger.Info("Weather poller started") // Wait for shutdown signal select { case sig := <-signalChan: app.logger.Info("Received shutdown signal", "signal", sig) app.shutdownCancel() case <-app.shutdownCtx.Done(): app.logger.Info("Shutdown initiated") } // Wait for graceful shutdown shutdownDone := make(chan struct{}) go func() { app.wg.Wait() close(shutdownDone) }() select { case <-shutdownDone: app.logger.Info("Shutdown completed") case <-time.After(30 * time.Second): app.logger.Warn("Shutdown timed out") } }