diff options
| author | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-03 11:31:14 +0200 |
|---|---|---|
| committer | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-03 11:31:14 +0200 |
| commit | 0e6085003e4b156595c039411855aec5b4b94623 (patch) | |
| tree | 6f4a940d50c41e0d5df7cfd003de5880b3e7de54 | |
| parent | efa05d7633e97832e28d4e051b42c92d01026514 (diff) | |
| download | pub-0e6085003e4b156595c039411855aec5b4b94623.tar.zst | |
Third commit
| -rw-r--r-- | go.mod | 5 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | main.go | 811 |
3 files changed, 366 insertions, 452 deletions
@@ -2,7 +2,10 @@ module pub go 1.25.5 -require github.com/eclipse/paho.mqtt.golang v1.5.1 +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/joho/godotenv v1.5.1 +) require ( github.com/gorilla/websocket v1.5.3 // indirect @@ -2,6 +2,8 @@ github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2I github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= @@ -4,75 +4,50 @@ import ( "context" "encoding/json" "encoding/xml" - "errors" "fmt" "io" "log/slog" + "math" "net/http" "net/url" "os" - "os/signal" "strconv" "strings" - "sync" - "sync/atomic" - "syscall" "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/joho/godotenv" ) /* ============================================================ - Configuration (now configurable) + Configuration ============================================================ */ type Config struct { // FMI API - FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"` + FMIEndpoint string `env:"FMI_ENDPOINT"` // Observation - StationID int `env:"STATION_ID" default:"100968"` - ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"` - ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"` - ObsTimestep int `env:"OBS_TIMESTEP" default:"60"` - WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"` + StationID int `env:"STATION_ID"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` + ObsTimestep int `env:"OBS_TIMESTEP"` + WatermarkFile string `env:"WATERMARK_FILE"` // Forecast - FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"` - HelLat float64 `env:"HEL_LAT" default:"60.1699"` - HelLon float64 `env:"HEL_LON" default:"24.9384"` + FcPollEvery time.Duration `env:"FC_POLL_EVERY"` + HelLat float64 `env:"HEL_LAT"` + HelLon float64 `env:"HEL_LON"` // MQTT - MQTTBroker string `env:"MQTT_BROKER" default:"tcp://tammi.cc:1883"` - MQTTClientID string `env:"MQTT_CLIENT_ID" default:"pub"` + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` MQTTUsername string `env:"MQTT_USERNAME"` MQTTPassword string `env:"MQTT_PASSWORD"` // Application - LogLevel string `env:"LOG_LEVEL" default:"info"` - HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"` - MaxRetries int `env:"MAX_RETRIES" default:"3"` - RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"` -} - -/* ============================================================ - Application State - ============================================================ */ - -type App struct { - config *Config - logger *slog.Logger - mqttClient mqtt.Client - httpClient *http.Client - - // State - lastObsWatermark atomic.Value // stores time.Time - lastForecastRun atomic.Value // stores time.Time - shutdownCtx context.Context - shutdownCancel context.CancelFunc - - // Wait group for graceful shutdown - wg sync.WaitGroup + LogLevel string `env:"LOG_LEVEL"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` } /* ============================================================ @@ -83,7 +58,7 @@ type Observation struct { Station int `json:"station"` Parameter string `json:"parameter"` Time time.Time `json:"time"` - Value float64 `json:"value"` + Value *float64 `json:"value,omitempty"` } type ForecastValue struct { @@ -96,90 +71,30 @@ type ForecastValue struct { RunTime time.Time `json:"run_time"` ForecastTime time.Time `json:"forecast_time"` Parameter string `json:"parameter"` - Value float64 `json:"value"` + Value *float64 `json:"value,omitempty"` } /* ============================================================ - Initialization + JSON Float for NaN handling ============================================================ */ -func NewApp() (*App, error) { - config := loadConfig() - - // Setup logging - logLevel := parseLogLevel(config.LogLevel) - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: logLevel, - })) - - // Create HTTP client with timeout - httpClient := &http.Client{ - Timeout: config.HTTPTimeout, - Transport: &http.Transport{ - MaxIdleConns: 10, - IdleConnTimeout: 30 * time.Second, - DisableCompression: false, - }, - } - - // Create MQTT client - mqttOpts := mqtt.NewClientOptions(). - AddBroker(config.MQTTBroker). - SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())). - SetCleanSession(true). - SetAutoReconnect(true). - SetMaxReconnectInterval(10 * time.Second). - SetConnectionLostHandler(func(c mqtt.Client, err error) { - logger.Error("MQTT connection lost", "error", err) - }). - SetOnConnectHandler(func(c mqtt.Client) { - logger.Info("MQTT connected successfully") - }) - - if config.MQTTUsername != "" { - mqttOpts.SetUsername(config.MQTTUsername) - if config.MQTTPassword != "" { - mqttOpts.SetPassword(config.MQTTPassword) - } - } - - mqttClient := mqtt.NewClient(mqttOpts) - - ctx, cancel := context.WithCancel(context.Background()) - - app := &App{ - config: config, - logger: logger, - httpClient: httpClient, - mqttClient: mqttClient, - shutdownCtx: ctx, - shutdownCancel: cancel, - } - - // Initialize atomic values - app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile)) - app.lastForecastRun.Store(time.Time{}) - - return app, nil -} +type JSONFloat64 float64 -func (app *App) ConnectMQTT() error { - token := app.mqttClient.Connect() - if !token.WaitTimeout(app.config.HTTPTimeout) { - return errors.New("MQTT connection timeout") - } - if err := token.Error(); err != nil { - return fmt.Errorf("MQTT connection failed: %w", err) +func (f JSONFloat64) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + return []byte("null"), nil } - app.logger.Info("Connected to MQTT broker") - return nil + return json.Marshal(float64(f)) } /* ============================================================ - Configuration loading + Load Configuration ============================================================ */ func loadConfig() *Config { + // Load .env file if it exists + _ = godotenv.Load() + cfg := &Config{ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), StationID: parseInt(getEnv("STATION_ID", "100968")), @@ -190,20 +105,19 @@ func loadConfig() *Config { FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), - MQTTBroker: getEnv("MQTT_BROKER", "tcp://tammi.cc:1883"), - MQTTClientID: getEnv("MQTT_CLIENT_ID", "pub"), - MQTTUsername: getEnv("MQTT_USERNAME", "host"), - MQTTPassword: getEnv("MQTT_PASSWORD", "salasana"), + MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: getEnv("MQTT_USERNAME", ""), + MQTTPassword: getEnv("MQTT_PASSWORD", ""), LogLevel: getEnv("LOG_LEVEL", "info"), HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), - MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")), - RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")), } + return cfg } func getEnv(key, defaultValue string) string { - if value, exists := os.LookupEnv(key); exists { + if value := os.Getenv(key); value != "" { return value } return defaultValue @@ -212,7 +126,7 @@ func getEnv(key, defaultValue string) string { func parseDuration(s string) time.Duration { d, err := time.ParseDuration(s) if err != nil { - panic(fmt.Sprintf("invalid duration %s: %v", s, err)) + return 5 * time.Minute // Default fallback } return d } @@ -220,7 +134,7 @@ func parseDuration(s string) time.Duration { func parseInt(s string) int { i, err := strconv.Atoi(s) if err != nil { - panic(fmt.Sprintf("invalid int %s: %v", s, err)) + return 0 } return i } @@ -228,7 +142,7 @@ func parseInt(s string) int { func parseFloat(s string) float64 { f, err := strconv.ParseFloat(s, 64) if err != nil { - panic(fmt.Sprintf("invalid float %s: %v", s, err)) + return 0 } return f } @@ -253,7 +167,6 @@ func parseLogLevel(level string) slog.Level { func loadWatermark(filename string) time.Time { b, err := os.ReadFile(filename) if err != nil { - // If file doesn't exist, start from 24 hours ago return time.Now().UTC().Add(-24 * time.Hour) } sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) @@ -269,90 +182,51 @@ func storeWatermark(filename string, t time.Time) error { } /* ============================================================ - Retry helper + HTTP Fetch ============================================================ */ -func withRetry(app *App, name string, maxRetries int, fn func() error) error { - var lastErr error - for i := 0; i < maxRetries; i++ { - if err := fn(); err != nil { - lastErr = err - app.logger.Warn("Operation failed, retrying", - "operation", name, - "attempt", i+1, - "max_attempts", maxRetries, - "error", err) - - if i < maxRetries-1 { - select { - case <-time.After(app.config.RetryDelay): - continue - case <-app.shutdownCtx.Done(): - return app.shutdownCtx.Err() - } - } - } else { - return nil - } - } - return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr) -} - -/* ============================================================ - HTTP helpers with retry - ============================================================ */ +func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() -func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) { - var body io.ReadCloser - err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error { - req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil) - if err != nil { - return err - } - req.Header.Set("Accept", "application/xml") - req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0") - - resp, err := app.httpClient.Do(req) - if err != nil { - return err - } - - if resp.StatusCode != http.StatusOK { - resp.Body.Close() - return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) - } - - body = resp.Body - return nil - }) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/xml") + resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } - return body, nil + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + return resp.Body, nil } /* ============================================================ - Observation polling + Observation Polling ============================================================ */ -func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) { +func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") - q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID)) + q.Set("fmisid", fmt.Sprintf("%d", stationID)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) - q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep)) + q.Set("timestep", fmt.Sprintf("%d", timestep)) - url := app.config.FMIEndpoint + "?" + q.Encode() - app.logger.Debug("Fetching observations", "url", url) - return app.fetchWithRetry(url) + return endpoint + "?" + q.Encode() } -func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) { +func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { dec := xml.NewDecoder(r) var ( @@ -360,7 +234,9 @@ func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation ts time.Time maxObserved = minTime out []Observation - parseErr error + inTVP bool + inTime bool + inValue bool ) for { @@ -378,76 +254,96 @@ func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { - param = normalizeParameter(a.Value) + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } } } + case "MeasurementTVP": + inTVP = true + ts = time.Time{} case "time": - var s string - if err := dec.DecodeElement(&s, &el); err != nil { - parseErr = fmt.Errorf("failed to decode time: %w", err) - continue - } - ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) - if err != nil { - parseErr = fmt.Errorf("failed to parse time %s: %w", s, err) - continue + if inTVP { + inTime = true } case "value": - var v float64 - if err := dec.DecodeElement(&v, &el); err != nil { - parseErr = fmt.Errorf("failed to decode value: %w", err) - continue + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + ts = parsedTime } - if ts.After(minTime) { - out = append(out, Observation{ - Station: -1, // Will be set by caller - Parameter: param, - Time: ts, - Value: v, - }) - if ts.After(maxObserved) { - maxObserved = ts + } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value } } + + out = append(out, Observation{ + Station: stationID, + Parameter: param, + Time: ts, + Value: valuePtr, + }) + + if ts.After(maxObserved) { + maxObserved = ts + } } - } - } - if parseErr != nil { - app.logger.Warn("Partial parse errors", "error", parseErr) + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } } return out, maxObserved, nil } /* ============================================================ - Forecast polling + Forecast Polling ============================================================ */ -func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) { +func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { q := url.Values{} q.Set("service", "WFS") q.Set("version", "2.0.0") q.Set("request", "GetFeature") q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") - q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) q.Set("starttime", start.Format(time.RFC3339)) q.Set("endtime", end.Format(time.RFC3339)) - url := app.config.FMIEndpoint + "?" + q.Encode() - app.logger.Debug("Fetching forecast", "url", url) - return app.fetchWithRetry(url) + return endpoint + "?" + q.Encode() } -func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) { +func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { dec := xml.NewDecoder(r) var ( - param string - runTime time.Time - fcTime time.Time - out []ForecastValue - parseErr error + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + inTVP bool + inTime bool + inValue bool ) for { @@ -464,254 +360,294 @@ func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) { switch el.Name.Local { case "resultTime": var s string - if err := dec.DecodeElement(&s, &el); err != nil { - parseErr = fmt.Errorf("failed to decode resultTime: %w", err) - continue - } - runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) - if err != nil { - parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err) - continue + if err := dec.DecodeElement(&s, &el); err == nil { + if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { + runTime = parsedTime + } } case "MeasurementTimeseries": for _, a := range el.Attr { if a.Name.Local == "id" { - param = normalizeParameter(a.Value) + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } } } + case "MeasurementTVP": + inTVP = true + fcTime = time.Time{} case "time": - var s string - if err := dec.DecodeElement(&s, &el); err != nil { - parseErr = fmt.Errorf("failed to decode forecast time: %w", err) - continue - } - fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) - if err != nil { - parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err) - continue + if inTVP { + inTime = true } case "value": - var v float64 - if err := dec.DecodeElement(&v, &el); err != nil { - parseErr = fmt.Errorf("failed to decode forecast value: %w", err) - continue + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + fcTime = parsedTime } - out = append(out, ForecastValue{ + } else if inValue && inTVP && !fcTime.IsZero() { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + fc := ForecastValue{ + Location: struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + }{lat, lon}, Model: "harmonie", RunTime: runTime, ForecastTime: fcTime, Parameter: param, - Value: v, - }) + Value: valuePtr, + } + out = append(out, fc) } - } - } - if parseErr != nil { - app.logger.Warn("Partial parse errors in forecast", "error", parseErr) + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } } return out, runTime, nil } /* ============================================================ - Polling workers + MQTT Publishing ============================================================ */ -func (app *App) runObservationPoller() { - defer app.wg.Done() +func createMQTTClient(cfg *Config) (mqtt.Client, error) { + opts := mqtt.NewClientOptions(). + AddBroker(cfg.MQTTBroker). + SetClientID(fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())). + SetCleanSession(true) - app.logger.Info("Starting observation poller", - "poll_interval", app.config.ObsPollEvery, - "station", app.config.StationID) + if cfg.MQTTUsername != "" { + opts.SetUsername(cfg.MQTTUsername) + if cfg.MQTTPassword != "" { + opts.SetPassword(cfg.MQTTPassword) + } + } - ticker := time.NewTicker(app.config.ObsPollEvery) - defer ticker.Stop() + client := mqtt.NewClient(opts) + token := client.Connect() - for { - select { - case <-app.shutdownCtx.Done(): - app.logger.Info("Observation poller stopping") - return - case <-ticker.C: - app.pollObservations() - } + if !token.WaitTimeout(cfg.HTTPTimeout) { + return nil, fmt.Errorf("MQTT connection timeout") + } + + if err := token.Error(); err != nil { + return nil, err } + + return client, nil } -func (app *App) pollObservations() { - startTime := time.Now() - lastObs := app.lastObsWatermark.Load().(time.Time) - obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag) +func publishObservation(client mqtt.Client, obs Observation, timeout time.Duration) error { + topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) - if !obsEnd.After(lastObs) { - return // Nothing new to fetch + // Safe JSON structure + type SafeObservation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value *JSONFloat64 `json:"value,omitempty"` } - app.logger.Info("Polling observations", - "start", lastObs, - "end", obsEnd) + var safeValue *JSONFloat64 + if obs.Value != nil { + v := JSONFloat64(*obs.Value) + safeValue = &v + } - body, err := app.fetchObservations(lastObs, obsEnd) - if err != nil { - app.logger.Error("Failed to fetch observations", "error", err) - return + safeObs := SafeObservation{ + Station: obs.Station, + Parameter: obs.Parameter, + Time: obs.Time, + Value: safeValue, } - defer body.Close() - obs, maxT, err := app.parseObservations(body, lastObs) + b, err := json.Marshal(safeObs) if err != nil { - app.logger.Error("Failed to parse observations", "error", err) - return + return err } - // Set station ID for all observations - for i := range obs { - obs[i].Station = app.config.StationID + token := client.Publish(topic, 1, false, b) + if !token.WaitTimeout(timeout) { + return fmt.Errorf("publish timeout") } + return token.Error() +} - if len(obs) > 0 { - if err := app.publishObs(obs); err != nil { - app.logger.Error("Failed to publish observations", "error", err) - return - } - - app.lastObsWatermark.Store(maxT) - if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil { - app.logger.Error("Failed to store watermark", "error", err) - } +func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration) error { + topic := fmt.Sprintf( + "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", + fc.RunTime.Format(time.RFC3339), + fc.Parameter, + ) - app.logger.Info("Published observations", - "count", len(obs), - "new_watermark", maxT, - "duration", time.Since(startTime)) - } else { - app.logger.Debug("No new observations") + // Safe JSON structure + type SafeForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value *JSONFloat64 `json:"value,omitempty"` } -} -func (app *App) runForecastPoller() { - defer app.wg.Done() + var safeValue *JSONFloat64 + if fc.Value != nil { + v := JSONFloat64(*fc.Value) + safeValue = &v + } - app.logger.Info("Starting forecast poller", - "poll_interval", app.config.FcPollEvery, - "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + safeFc := SafeForecastValue{ + Location: fc.Location, + Model: fc.Model, + RunTime: fc.RunTime, + ForecastTime: fc.ForecastTime, + Parameter: fc.Parameter, + Value: safeValue, + } - ticker := time.NewTicker(app.config.FcPollEvery) - defer ticker.Stop() + b, err := json.Marshal(safeFc) + if err != nil { + return err + } - for { - select { - case <-app.shutdownCtx.Done(): - app.logger.Info("Forecast poller stopping") - return - case <-ticker.C: - app.pollForecast() - } + token := client.Publish(topic, 1, true, b) + if !token.WaitTimeout(timeout) { + return fmt.Errorf("publish timeout") } + return token.Error() } -func (app *App) pollForecast() { - startTime := time.Now() - now := time.Now().UTC() - start := now - end := now.Add(48 * time.Hour) +/* ============================================================ + Single Poller + ============================================================ */ + +func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { + // Load initial state + lastObs := loadWatermark(cfg.WatermarkFile) + lastFcRun := time.Time{} - app.logger.Info("Polling forecast", - "start", start, - "end", end) + // Create timers + obsTimer := time.NewTimer(0) // Start immediately + fcTimer := time.NewTimer(0) // Start immediately - body, err := app.fetchForecast(start, end) - if err != nil { - app.logger.Error("Failed to fetch forecast", "error", err) - return - } - defer body.Close() + defer obsTimer.Stop() + defer fcTimer.Stop() - fc, runTime, err := app.parseForecast(body) + // Create MQTT client + mqttClient, err := createMQTTClient(cfg) if err != nil { - app.logger.Error("Failed to parse forecast", "error", err) - return + return fmt.Errorf("failed to connect to MQTT: %w", err) } + defer mqttClient.Disconnect(250) - // Set location for all forecasts - for i := range fc { - fc[i].Location.Lat = app.config.HelLat - fc[i].Location.Lon = app.config.HelLon - } + logger.Info("Poller started", + "obs_interval", cfg.ObsPollEvery, + "fc_interval", cfg.FcPollEvery) - if len(fc) > 0 { - // Check if this is a new forecast run - lastRun := app.lastForecastRun.Load().(time.Time) - if runTime.After(lastRun) { - if err := app.publishForecast(fc); err != nil { - app.logger.Error("Failed to publish forecast", "error", err) - return - } + for { + select { + case <-ctx.Done(): + logger.Info("Poller stopping") + return nil - app.lastForecastRun.Store(runTime) - app.logger.Info("Published forecast", - "count", len(fc), - "run_time", runTime, - "duration", time.Since(startTime)) - } else { - app.logger.Debug("Forecast run already published", "run_time", runTime) - } - } -} + case <-obsTimer.C: + // Poll observations + obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) + if obsEnd.After(lastObs) { + logger.Info("Polling observations", "from", lastObs, "to", obsEnd) -/* ============================================================ - MQTT publishing with error handling - ============================================================ */ + url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) + body, err := fetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch observations", "error", err) + } else { + obs, maxT, err := parseObservations(body, cfg.StationID, lastObs, logger) + body.Close() + + if err != nil { + logger.Error("Failed to parse observations", "error", err) + } else if len(obs) > 0 { + // Publish observations + for _, o := range obs { + if err := publishObservation(mqttClient, o, cfg.HTTPTimeout); err != nil { + logger.Error("Failed to publish observation", "error", err) + } + } + + // Update watermark + lastObs = maxT + if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { + logger.Error("Failed to store watermark", "error", err) + } + + logger.Info("Published observations", "count", len(obs), "new_watermark", lastObs) + } + } + } + obsTimer.Reset(cfg.ObsPollEvery) -func (app *App) publishObs(obs []Observation) error { - for _, o := range obs { - topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter) - b, err := json.Marshal(o) - if err != nil { - return fmt.Errorf("failed to marshal observation: %w", err) - } + case <-fcTimer.C: + // Poll forecast + now := time.Now().UTC() + start := now + end := now.Add(48 * time.Hour) - token := app.mqttClient.Publish(topic, 1, false, b) - if !token.WaitTimeout(app.config.HTTPTimeout) { - return errors.New("MQTT publish timeout") - } - if err := token.Error(); err != nil { - return fmt.Errorf("failed to publish observation: %w", err) - } - } - return nil -} + logger.Info("Polling forecast", "from", start, "to", end) -func (app *App) publishForecast(fc []ForecastValue) error { - for _, v := range fc { - topic := fmt.Sprintf( - "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", - v.RunTime.Format(time.RFC3339), - v.Parameter, - ) - b, err := json.Marshal(v) - if err != nil { - return fmt.Errorf("failed to marshal forecast: %w", err) - } + url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) + body, err := fetchData(url, cfg.HTTPTimeout) + if err != nil { + logger.Error("Failed to fetch forecast", "error", err) + } else { + fc, runTime, err := parseForecast(body, cfg.HelLat, cfg.HelLon, logger) + body.Close() - token := app.mqttClient.Publish(topic, 1, true, b) - if !token.WaitTimeout(app.config.HTTPTimeout) { - return errors.New("MQTT publish timeout") - } - if err := token.Error(); err != nil { - return fmt.Errorf("failed to publish forecast: %w", err) + if err != nil { + logger.Error("Failed to parse forecast", "error", err) + } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { + // Publish forecast + for _, f := range fc { + if err := publishForecast(mqttClient, f, cfg.HTTPTimeout); err != nil { + logger.Error("Failed to publish forecast", "error", err) + } + } + + lastFcRun = runTime + logger.Info("Published forecast", "count", len(fc), "run_time", runTime) + } + } + fcTimer.Reset(cfg.FcPollEvery) } } - return nil -} - -/* ============================================================ - Utilities - ============================================================ */ - -func normalizeParameter(id string) string { - return strings.TrimPrefix(strings.ToLower(id), "ts_") } /* ============================================================ @@ -719,51 +655,24 @@ func normalizeParameter(id string) string { ============================================================ */ func main() { - // Create and initialize app - app, err := NewApp() - if err != nil { - panic(fmt.Sprintf("Failed to initialize app: %v", err)) - } - defer app.shutdownCancel() + // Load configuration + cfg := loadConfig() + + // Setup logging + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.LogLevel), + })) - // Setup signal handling - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Connect to MQTT - if err := app.ConnectMQTT(); err != nil { - app.logger.Error("Failed to connect to MQTT", "error", err) + // Run the poller + logger.Info("Starting weather data poller") + if err := runPoller(ctx, cfg, logger); err != nil { + logger.Error("Poller failed", "error", err) os.Exit(1) } - defer app.mqttClient.Disconnect(250) - - // Start pollers - app.wg.Add(2) - go app.runObservationPoller() - go app.runForecastPoller() - - app.logger.Info("Weather poller started") - - // Wait for shutdown signal - select { - case sig := <-signalChan: - app.logger.Info("Received shutdown signal", "signal", sig) - app.shutdownCancel() - case <-app.shutdownCtx.Done(): - app.logger.Info("Shutdown initiated") - } - // Wait for graceful shutdown - shutdownDone := make(chan struct{}) - go func() { - app.wg.Wait() - close(shutdownDone) - }() - - select { - case <-shutdownDone: - app.logger.Info("Shutdown completed") - case <-time.After(30 * time.Second): - app.logger.Warn("Shutdown timed out") - } + logger.Info("Poller stopped gracefully") } |
