diff options
Diffstat (limited to 'main.go')
| -rw-r--r-- | main.go | 769 |
1 files changed, 769 insertions, 0 deletions
@@ -0,0 +1,769 @@ +package main + +import ( + "context" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +/* ============================================================ + Configuration (now configurable) + ============================================================ */ + +type Config struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"` + + // Observation + StationID int `env:"STATION_ID" default:"100968"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"` + ObsTimestep int `env:"OBS_TIMESTEP" default:"60"` + WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"` + HelLat float64 `env:"HEL_LAT" default:"60.1699"` + HelLon float64 `env:"HEL_LON" default:"24.9384"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER" default:"tcp://localhost:1883"` + MQTTClientID string `env:"MQTT_CLIENT_ID" default:"fmi-poller"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + + // Application + LogLevel string `env:"LOG_LEVEL" default:"info"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"` + MaxRetries int `env:"MAX_RETRIES" default:"3"` + RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"` +} + +/* ============================================================ + Application State + ============================================================ */ + +type App struct { + config *Config + logger *slog.Logger + mqttClient mqtt.Client + httpClient *http.Client + + // State + lastObsWatermark atomic.Value // stores time.Time + lastForecastRun atomic.Value // stores time.Time + shutdownCtx context.Context + shutdownCancel context.CancelFunc + + // Wait group for graceful shutdown + wg sync.WaitGroup +} + +/* ============================================================ + Domain models + ============================================================ */ + +type Observation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value float64 `json:"value"` +} + +type ForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value float64 `json:"value"` +} + +/* ============================================================ + Initialization + ============================================================ */ + +func NewApp() (*App, error) { + config := loadConfig() + + // Setup logging + logLevel := parseLogLevel(config.LogLevel) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })) + + // Create HTTP client with timeout + httpClient := &http.Client{ + Timeout: config.HTTPTimeout, + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: false, + }, + } + + // Create MQTT client + mqttOpts := mqtt.NewClientOptions(). + AddBroker(config.MQTTBroker). + SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())). + SetCleanSession(true). + SetAutoReconnect(true). + SetMaxReconnectInterval(10 * time.Second). + SetConnectionLostHandler(func(c mqtt.Client, err error) { + logger.Error("MQTT connection lost", "error", err) + }). + SetOnConnectHandler(func(c mqtt.Client) { + logger.Info("MQTT connected successfully") + }) + + if config.MQTTUsername != "" { + mqttOpts.SetUsername(config.MQTTUsername) + if config.MQTTPassword != "" { + mqttOpts.SetPassword(config.MQTTPassword) + } + } + + mqttClient := mqtt.NewClient(mqttOpts) + + ctx, cancel := context.WithCancel(context.Background()) + + app := &App{ + config: config, + logger: logger, + httpClient: httpClient, + mqttClient: mqttClient, + shutdownCtx: ctx, + shutdownCancel: cancel, + } + + // Initialize atomic values + app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile)) + app.lastForecastRun.Store(time.Time{}) + + return app, nil +} + +func (app *App) ConnectMQTT() error { + token := app.mqttClient.Connect() + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT connection timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("MQTT connection failed: %w", err) + } + app.logger.Info("Connected to MQTT broker") + return nil +} + +/* ============================================================ + Configuration loading + ============================================================ */ + +func loadConfig() *Config { + cfg := &Config{ + FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: parseInt(getEnv("STATION_ID", "100968")), + ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), + WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), + HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), + HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), + MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-poller"), + MQTTUsername: getEnv("MQTT_USERNAME", ""), + MQTTPassword: getEnv("MQTT_PASSWORD", ""), + LogLevel: getEnv("LOG_LEVEL", "info"), + HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), + MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")), + RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")), + } + return cfg +} + +func getEnv(key, defaultValue string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return defaultValue +} + +func parseDuration(s string) time.Duration { + d, err := time.ParseDuration(s) + if err != nil { + panic(fmt.Sprintf("invalid duration %s: %v", s, err)) + } + return d +} + +func parseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + panic(fmt.Sprintf("invalid int %s: %v", s, err)) + } + return i +} + +func parseFloat(s string) float64 { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + panic(fmt.Sprintf("invalid float %s: %v", s, err)) + } + return f +} + +func parseLogLevel(level string) slog.Level { + switch strings.ToLower(level) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} + +/* ============================================================ + Watermark persistence + ============================================================ */ + +func loadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + // If file doesn't exist, start from 24 hours ago + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +func storeWatermark(filename string, t time.Time) error { + data := []byte(fmt.Sprintf("%d", t.Unix())) + return os.WriteFile(filename, data, 0644) +} + +/* ============================================================ + Retry helper + ============================================================ */ + +func withRetry(app *App, name string, maxRetries int, fn func() error) error { + var lastErr error + for i := 0; i < maxRetries; i++ { + if err := fn(); err != nil { + lastErr = err + app.logger.Warn("Operation failed, retrying", + "operation", name, + "attempt", i+1, + "max_attempts", maxRetries, + "error", err) + + if i < maxRetries-1 { + select { + case <-time.After(app.config.RetryDelay): + continue + case <-app.shutdownCtx.Done(): + return app.shutdownCtx.Err() + } + } + } else { + return nil + } + } + return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr) +} + +/* ============================================================ + HTTP helpers with retry + ============================================================ */ + +func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) { + var body io.ReadCloser + err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error { + req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil) + if err != nil { + return err + } + req.Header.Set("Accept", "application/xml") + req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0") + + resp, err := app.httpClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + body = resp.Body + return nil + }) + + if err != nil { + return nil, err + } + return body, nil +} + +/* ============================================================ + Observation polling + ============================================================ */ + +func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") + q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep)) + + url := app.config.FMIEndpoint + "?" + q.Encode() + app.logger.Debug("Fetching observations", "url", url) + return app.fetchWithRetry(url) +} + +func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(r) + + var ( + param string + ts time.Time + maxObserved = minTime + out []Observation + parseErr error + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, maxObserved, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + param = normalizeParameter(a.Value) + } + } + case "time": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode time: %w", err) + continue + } + ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse time %s: %w", s, err) + continue + } + case "value": + var v float64 + if err := dec.DecodeElement(&v, &el); err != nil { + parseErr = fmt.Errorf("failed to decode value: %w", err) + continue + } + if ts.After(minTime) { + out = append(out, Observation{ + Station: -1, // Will be set by caller + Parameter: param, + Time: ts, + Value: v, + }) + if ts.After(maxObserved) { + maxObserved = ts + } + } + } + } + } + + if parseErr != nil { + app.logger.Warn("Partial parse errors", "error", parseErr) + } + + return out, maxObserved, nil +} + +/* ============================================================ + Forecast polling + ============================================================ */ + +func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + + url := app.config.FMIEndpoint + "?" + q.Encode() + app.logger.Debug("Fetching forecast", "url", url) + return app.fetchWithRetry(url) +} + +func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(r) + + var ( + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + parseErr error + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, runTime, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "resultTime": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode resultTime: %w", err) + continue + } + runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err) + continue + } + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + param = normalizeParameter(a.Value) + } + } + case "time": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode forecast time: %w", err) + continue + } + fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err) + continue + } + case "value": + var v float64 + if err := dec.DecodeElement(&v, &el); err != nil { + parseErr = fmt.Errorf("failed to decode forecast value: %w", err) + continue + } + out = append(out, ForecastValue{ + Model: "harmonie", + RunTime: runTime, + ForecastTime: fcTime, + Parameter: param, + Value: v, + }) + } + } + } + + if parseErr != nil { + app.logger.Warn("Partial parse errors in forecast", "error", parseErr) + } + + return out, runTime, nil +} + +/* ============================================================ + Polling workers + ============================================================ */ + +func (app *App) runObservationPoller() { + defer app.wg.Done() + + app.logger.Info("Starting observation poller", + "poll_interval", app.config.ObsPollEvery, + "station", app.config.StationID) + + ticker := time.NewTicker(app.config.ObsPollEvery) + defer ticker.Stop() + + for { + select { + case <-app.shutdownCtx.Done(): + app.logger.Info("Observation poller stopping") + return + case <-ticker.C: + app.pollObservations() + } + } +} + +func (app *App) pollObservations() { + startTime := time.Now() + lastObs := app.lastObsWatermark.Load().(time.Time) + obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag) + + if !obsEnd.After(lastObs) { + return // Nothing new to fetch + } + + app.logger.Info("Polling observations", + "start", lastObs, + "end", obsEnd) + + body, err := app.fetchObservations(lastObs, obsEnd) + if err != nil { + app.logger.Error("Failed to fetch observations", "error", err) + return + } + defer body.Close() + + obs, maxT, err := app.parseObservations(body, lastObs) + if err != nil { + app.logger.Error("Failed to parse observations", "error", err) + return + } + + // Set station ID for all observations + for i := range obs { + obs[i].Station = app.config.StationID + } + + if len(obs) > 0 { + if err := app.publishObs(obs); err != nil { + app.logger.Error("Failed to publish observations", "error", err) + return + } + + app.lastObsWatermark.Store(maxT) + if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil { + app.logger.Error("Failed to store watermark", "error", err) + } + + app.logger.Info("Published observations", + "count", len(obs), + "new_watermark", maxT, + "duration", time.Since(startTime)) + } else { + app.logger.Debug("No new observations") + } +} + +func (app *App) runForecastPoller() { + defer app.wg.Done() + + app.logger.Info("Starting forecast poller", + "poll_interval", app.config.FcPollEvery, + "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + + ticker := time.NewTicker(app.config.FcPollEvery) + defer ticker.Stop() + + for { + select { + case <-app.shutdownCtx.Done(): + app.logger.Info("Forecast poller stopping") + return + case <-ticker.C: + app.pollForecast() + } + } +} + +func (app *App) pollForecast() { + startTime := time.Now() + now := time.Now().UTC() + start := now + end := now.Add(48 * time.Hour) + + app.logger.Info("Polling forecast", + "start", start, + "end", end) + + body, err := app.fetchForecast(start, end) + if err != nil { + app.logger.Error("Failed to fetch forecast", "error", err) + return + } + defer body.Close() + + fc, runTime, err := app.parseForecast(body) + if err != nil { + app.logger.Error("Failed to parse forecast", "error", err) + return + } + + // Set location for all forecasts + for i := range fc { + fc[i].Location.Lat = app.config.HelLat + fc[i].Location.Lon = app.config.HelLon + } + + if len(fc) > 0 { + // Check if this is a new forecast run + lastRun := app.lastForecastRun.Load().(time.Time) + if runTime.After(lastRun) { + if err := app.publishForecast(fc); err != nil { + app.logger.Error("Failed to publish forecast", "error", err) + return + } + + app.lastForecastRun.Store(runTime) + app.logger.Info("Published forecast", + "count", len(fc), + "run_time", runTime, + "duration", time.Since(startTime)) + } else { + app.logger.Debug("Forecast run already published", "run_time", runTime) + } + } +} + +/* ============================================================ + MQTT publishing with error handling + ============================================================ */ + +func (app *App) publishObs(obs []Observation) error { + for _, o := range obs { + topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter) + b, err := json.Marshal(o) + if err != nil { + return fmt.Errorf("failed to marshal observation: %w", err) + } + + token := app.mqttClient.Publish(topic, 1, false, b) + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT publish timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("failed to publish observation: %w", err) + } + } + return nil +} + +func (app *App) publishForecast(fc []ForecastValue) error { + for _, v := range fc { + topic := fmt.Sprintf( + "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", + v.RunTime.Format(time.RFC3339), + v.Parameter, + ) + b, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal forecast: %w", err) + } + + token := app.mqttClient.Publish(topic, 1, true, b) + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT publish timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("failed to publish forecast: %w", err) + } + } + return nil +} + +/* ============================================================ + Utilities + ============================================================ */ + +func normalizeParameter(id string) string { + return strings.TrimPrefix(strings.ToLower(id), "ts_") +} + +/* ============================================================ + Main + ============================================================ */ + +func main() { + // Create and initialize app + app, err := NewApp() + if err != nil { + panic(fmt.Sprintf("Failed to initialize app: %v", err)) + } + defer app.shutdownCancel() + + // Setup signal handling + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + // Connect to MQTT + if err := app.ConnectMQTT(); err != nil { + app.logger.Error("Failed to connect to MQTT", "error", err) + os.Exit(1) + } + defer app.mqttClient.Disconnect(250) + + // Start pollers + app.wg.Add(2) + go app.runObservationPoller() + go app.runForecastPoller() + + app.logger.Info("Weather poller started") + + // Wait for shutdown signal + select { + case sig := <-signalChan: + app.logger.Info("Received shutdown signal", "signal", sig) + app.shutdownCancel() + case <-app.shutdownCtx.Done(): + app.logger.Info("Shutdown initiated") + } + + // Wait for graceful shutdown + shutdownDone := make(chan struct{}) + go func() { + app.wg.Wait() + close(shutdownDone) + }() + + select { + case <-shutdownDone: + app.logger.Info("Shutdown completed") + case <-time.After(30 * time.Second): + app.logger.Warn("Shutdown timed out") + } +} |
