aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPetri Hienonen <petri.hienonen@gmail.com>2026-01-03 11:31:14 +0200
committerPetri Hienonen <petri.hienonen@gmail.com>2026-01-03 11:31:14 +0200
commit0e6085003e4b156595c039411855aec5b4b94623 (patch)
tree6f4a940d50c41e0d5df7cfd003de5880b3e7de54
parentefa05d7633e97832e28d4e051b42c92d01026514 (diff)
downloadpub-0e6085003e4b156595c039411855aec5b4b94623.tar.zst
Third commit
-rw-r--r--go.mod5
-rw-r--r--go.sum2
-rw-r--r--main.go811
3 files changed, 366 insertions, 452 deletions
diff --git a/go.mod b/go.mod
index 54112f1..35a0f36 100644
--- a/go.mod
+++ b/go.mod
@@ -2,7 +2,10 @@ module pub
go 1.25.5
-require github.com/eclipse/paho.mqtt.golang v1.5.1
+require (
+ github.com/eclipse/paho.mqtt.golang v1.5.1
+ github.com/joho/godotenv v1.5.1
+)
require (
github.com/gorilla/websocket v1.5.3 // indirect
diff --git a/go.sum b/go.sum
index 586bb58..50fe7b4 100644
--- a/go.sum
+++ b/go.sum
@@ -2,6 +2,8 @@ github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2I
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
+github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
diff --git a/main.go b/main.go
index 4a3ad9e..4c4f302 100644
--- a/main.go
+++ b/main.go
@@ -4,75 +4,50 @@ import (
"context"
"encoding/json"
"encoding/xml"
- "errors"
"fmt"
"io"
"log/slog"
+ "math"
"net/http"
"net/url"
"os"
- "os/signal"
"strconv"
"strings"
- "sync"
- "sync/atomic"
- "syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
+ "github.com/joho/godotenv"
)
/* ============================================================
- Configuration (now configurable)
+ Configuration
============================================================ */
type Config struct {
// FMI API
- FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"`
+ FMIEndpoint string `env:"FMI_ENDPOINT"`
// Observation
- StationID int `env:"STATION_ID" default:"100968"`
- ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"`
- ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"`
- ObsTimestep int `env:"OBS_TIMESTEP" default:"60"`
- WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"`
+ StationID int `env:"STATION_ID"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
+ ObsTimestep int `env:"OBS_TIMESTEP"`
+ WatermarkFile string `env:"WATERMARK_FILE"`
// Forecast
- FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"`
- HelLat float64 `env:"HEL_LAT" default:"60.1699"`
- HelLon float64 `env:"HEL_LON" default:"24.9384"`
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
+ HelLat float64 `env:"HEL_LAT"`
+ HelLon float64 `env:"HEL_LON"`
// MQTT
- MQTTBroker string `env:"MQTT_BROKER" default:"tcp://tammi.cc:1883"`
- MQTTClientID string `env:"MQTT_CLIENT_ID" default:"pub"`
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
MQTTUsername string `env:"MQTT_USERNAME"`
MQTTPassword string `env:"MQTT_PASSWORD"`
// Application
- LogLevel string `env:"LOG_LEVEL" default:"info"`
- HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"`
- MaxRetries int `env:"MAX_RETRIES" default:"3"`
- RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"`
-}
-
-/* ============================================================
- Application State
- ============================================================ */
-
-type App struct {
- config *Config
- logger *slog.Logger
- mqttClient mqtt.Client
- httpClient *http.Client
-
- // State
- lastObsWatermark atomic.Value // stores time.Time
- lastForecastRun atomic.Value // stores time.Time
- shutdownCtx context.Context
- shutdownCancel context.CancelFunc
-
- // Wait group for graceful shutdown
- wg sync.WaitGroup
+ LogLevel string `env:"LOG_LEVEL"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
}
/* ============================================================
@@ -83,7 +58,7 @@ type Observation struct {
Station int `json:"station"`
Parameter string `json:"parameter"`
Time time.Time `json:"time"`
- Value float64 `json:"value"`
+ Value *float64 `json:"value,omitempty"`
}
type ForecastValue struct {
@@ -96,90 +71,30 @@ type ForecastValue struct {
RunTime time.Time `json:"run_time"`
ForecastTime time.Time `json:"forecast_time"`
Parameter string `json:"parameter"`
- Value float64 `json:"value"`
+ Value *float64 `json:"value,omitempty"`
}
/* ============================================================
- Initialization
+ JSON Float for NaN handling
============================================================ */
-func NewApp() (*App, error) {
- config := loadConfig()
-
- // Setup logging
- logLevel := parseLogLevel(config.LogLevel)
- logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
- Level: logLevel,
- }))
-
- // Create HTTP client with timeout
- httpClient := &http.Client{
- Timeout: config.HTTPTimeout,
- Transport: &http.Transport{
- MaxIdleConns: 10,
- IdleConnTimeout: 30 * time.Second,
- DisableCompression: false,
- },
- }
-
- // Create MQTT client
- mqttOpts := mqtt.NewClientOptions().
- AddBroker(config.MQTTBroker).
- SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())).
- SetCleanSession(true).
- SetAutoReconnect(true).
- SetMaxReconnectInterval(10 * time.Second).
- SetConnectionLostHandler(func(c mqtt.Client, err error) {
- logger.Error("MQTT connection lost", "error", err)
- }).
- SetOnConnectHandler(func(c mqtt.Client) {
- logger.Info("MQTT connected successfully")
- })
-
- if config.MQTTUsername != "" {
- mqttOpts.SetUsername(config.MQTTUsername)
- if config.MQTTPassword != "" {
- mqttOpts.SetPassword(config.MQTTPassword)
- }
- }
-
- mqttClient := mqtt.NewClient(mqttOpts)
-
- ctx, cancel := context.WithCancel(context.Background())
-
- app := &App{
- config: config,
- logger: logger,
- httpClient: httpClient,
- mqttClient: mqttClient,
- shutdownCtx: ctx,
- shutdownCancel: cancel,
- }
-
- // Initialize atomic values
- app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile))
- app.lastForecastRun.Store(time.Time{})
-
- return app, nil
-}
+type JSONFloat64 float64
-func (app *App) ConnectMQTT() error {
- token := app.mqttClient.Connect()
- if !token.WaitTimeout(app.config.HTTPTimeout) {
- return errors.New("MQTT connection timeout")
- }
- if err := token.Error(); err != nil {
- return fmt.Errorf("MQTT connection failed: %w", err)
+func (f JSONFloat64) MarshalJSON() ([]byte, error) {
+ if math.IsNaN(float64(f)) {
+ return []byte("null"), nil
}
- app.logger.Info("Connected to MQTT broker")
- return nil
+ return json.Marshal(float64(f))
}
/* ============================================================
- Configuration loading
+ Load Configuration
============================================================ */
func loadConfig() *Config {
+ // Load .env file if it exists
+ _ = godotenv.Load()
+
cfg := &Config{
FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
StationID: parseInt(getEnv("STATION_ID", "100968")),
@@ -190,20 +105,19 @@ func loadConfig() *Config {
FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
- MQTTBroker: getEnv("MQTT_BROKER", "tcp://tammi.cc:1883"),
- MQTTClientID: getEnv("MQTT_CLIENT_ID", "pub"),
- MQTTUsername: getEnv("MQTT_USERNAME", "host"),
- MQTTPassword: getEnv("MQTT_PASSWORD", "salasana"),
+ MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: getEnv("MQTT_USERNAME", ""),
+ MQTTPassword: getEnv("MQTT_PASSWORD", ""),
LogLevel: getEnv("LOG_LEVEL", "info"),
HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
- MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")),
- RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")),
}
+
return cfg
}
func getEnv(key, defaultValue string) string {
- if value, exists := os.LookupEnv(key); exists {
+ if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
@@ -212,7 +126,7 @@ func getEnv(key, defaultValue string) string {
func parseDuration(s string) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
- panic(fmt.Sprintf("invalid duration %s: %v", s, err))
+ return 5 * time.Minute // Default fallback
}
return d
}
@@ -220,7 +134,7 @@ func parseDuration(s string) time.Duration {
func parseInt(s string) int {
i, err := strconv.Atoi(s)
if err != nil {
- panic(fmt.Sprintf("invalid int %s: %v", s, err))
+ return 0
}
return i
}
@@ -228,7 +142,7 @@ func parseInt(s string) int {
func parseFloat(s string) float64 {
f, err := strconv.ParseFloat(s, 64)
if err != nil {
- panic(fmt.Sprintf("invalid float %s: %v", s, err))
+ return 0
}
return f
}
@@ -253,7 +167,6 @@ func parseLogLevel(level string) slog.Level {
func loadWatermark(filename string) time.Time {
b, err := os.ReadFile(filename)
if err != nil {
- // If file doesn't exist, start from 24 hours ago
return time.Now().UTC().Add(-24 * time.Hour)
}
sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
@@ -269,90 +182,51 @@ func storeWatermark(filename string, t time.Time) error {
}
/* ============================================================
- Retry helper
+ HTTP Fetch
============================================================ */
-func withRetry(app *App, name string, maxRetries int, fn func() error) error {
- var lastErr error
- for i := 0; i < maxRetries; i++ {
- if err := fn(); err != nil {
- lastErr = err
- app.logger.Warn("Operation failed, retrying",
- "operation", name,
- "attempt", i+1,
- "max_attempts", maxRetries,
- "error", err)
-
- if i < maxRetries-1 {
- select {
- case <-time.After(app.config.RetryDelay):
- continue
- case <-app.shutdownCtx.Done():
- return app.shutdownCtx.Err()
- }
- }
- } else {
- return nil
- }
- }
- return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr)
-}
-
-/* ============================================================
- HTTP helpers with retry
- ============================================================ */
+func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
-func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) {
- var body io.ReadCloser
- err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error {
- req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil)
- if err != nil {
- return err
- }
- req.Header.Set("Accept", "application/xml")
- req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0")
-
- resp, err := app.httpClient.Do(req)
- if err != nil {
- return err
- }
-
- if resp.StatusCode != http.StatusOK {
- resp.Body.Close()
- return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
- }
-
- body = resp.Body
- return nil
- })
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Accept", "application/xml")
+ resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
- return body, nil
+
+ if resp.StatusCode != http.StatusOK {
+ resp.Body.Close()
+ return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
+ }
+
+ return resp.Body, nil
}
/* ============================================================
- Observation polling
+ Observation Polling
============================================================ */
-func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) {
+func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
q := url.Values{}
q.Set("service", "WFS")
q.Set("version", "2.0.0")
q.Set("request", "GetFeature")
q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
- q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID))
+ q.Set("fmisid", fmt.Sprintf("%d", stationID))
q.Set("starttime", start.Format(time.RFC3339))
q.Set("endtime", end.Format(time.RFC3339))
- q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep))
+ q.Set("timestep", fmt.Sprintf("%d", timestep))
- url := app.config.FMIEndpoint + "?" + q.Encode()
- app.logger.Debug("Fetching observations", "url", url)
- return app.fetchWithRetry(url)
+ return endpoint + "?" + q.Encode()
}
-func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) {
+func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) {
dec := xml.NewDecoder(r)
var (
@@ -360,7 +234,9 @@ func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation
ts time.Time
maxObserved = minTime
out []Observation
- parseErr error
+ inTVP bool
+ inTime bool
+ inValue bool
)
for {
@@ -378,76 +254,96 @@ func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation
case "MeasurementTimeseries":
for _, a := range el.Attr {
if a.Name.Local == "id" {
- param = normalizeParameter(a.Value)
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
}
}
+ case "MeasurementTVP":
+ inTVP = true
+ ts = time.Time{}
case "time":
- var s string
- if err := dec.DecodeElement(&s, &el); err != nil {
- parseErr = fmt.Errorf("failed to decode time: %w", err)
- continue
- }
- ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
- if err != nil {
- parseErr = fmt.Errorf("failed to parse time %s: %w", s, err)
- continue
+ if inTVP {
+ inTime = true
}
case "value":
- var v float64
- if err := dec.DecodeElement(&v, &el); err != nil {
- parseErr = fmt.Errorf("failed to decode value: %w", err)
- continue
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ ts = parsedTime
}
- if ts.After(minTime) {
- out = append(out, Observation{
- Station: -1, // Will be set by caller
- Parameter: param,
- Time: ts,
- Value: v,
- })
- if ts.After(maxObserved) {
- maxObserved = ts
+ } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
}
}
+
+ out = append(out, Observation{
+ Station: stationID,
+ Parameter: param,
+ Time: ts,
+ Value: valuePtr,
+ })
+
+ if ts.After(maxObserved) {
+ maxObserved = ts
+ }
}
- }
- }
- if parseErr != nil {
- app.logger.Warn("Partial parse errors", "error", parseErr)
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
}
return out, maxObserved, nil
}
/* ============================================================
- Forecast polling
+ Forecast Polling
============================================================ */
-func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) {
+func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
q := url.Values{}
q.Set("service", "WFS")
q.Set("version", "2.0.0")
q.Set("request", "GetFeature")
q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
- q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon))
+ q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon))
q.Set("starttime", start.Format(time.RFC3339))
q.Set("endtime", end.Format(time.RFC3339))
- url := app.config.FMIEndpoint + "?" + q.Encode()
- app.logger.Debug("Fetching forecast", "url", url)
- return app.fetchWithRetry(url)
+ return endpoint + "?" + q.Encode()
}
-func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) {
+func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) {
dec := xml.NewDecoder(r)
var (
- param string
- runTime time.Time
- fcTime time.Time
- out []ForecastValue
- parseErr error
+ param string
+ runTime time.Time
+ fcTime time.Time
+ out []ForecastValue
+ inTVP bool
+ inTime bool
+ inValue bool
)
for {
@@ -464,254 +360,294 @@ func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) {
switch el.Name.Local {
case "resultTime":
var s string
- if err := dec.DecodeElement(&s, &el); err != nil {
- parseErr = fmt.Errorf("failed to decode resultTime: %w", err)
- continue
- }
- runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
- if err != nil {
- parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err)
- continue
+ if err := dec.DecodeElement(&s, &el); err == nil {
+ if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil {
+ runTime = parsedTime
+ }
}
case "MeasurementTimeseries":
for _, a := range el.Attr {
if a.Name.Local == "id" {
- param = normalizeParameter(a.Value)
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
}
}
+ case "MeasurementTVP":
+ inTVP = true
+ fcTime = time.Time{}
case "time":
- var s string
- if err := dec.DecodeElement(&s, &el); err != nil {
- parseErr = fmt.Errorf("failed to decode forecast time: %w", err)
- continue
- }
- fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
- if err != nil {
- parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err)
- continue
+ if inTVP {
+ inTime = true
}
case "value":
- var v float64
- if err := dec.DecodeElement(&v, &el); err != nil {
- parseErr = fmt.Errorf("failed to decode forecast value: %w", err)
- continue
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ fcTime = parsedTime
}
- out = append(out, ForecastValue{
+ } else if inValue && inTVP && !fcTime.IsZero() {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
+ }
+ }
+
+ fc := ForecastValue{
+ Location: struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ }{lat, lon},
Model: "harmonie",
RunTime: runTime,
ForecastTime: fcTime,
Parameter: param,
- Value: v,
- })
+ Value: valuePtr,
+ }
+ out = append(out, fc)
}
- }
- }
- if parseErr != nil {
- app.logger.Warn("Partial parse errors in forecast", "error", parseErr)
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
}
return out, runTime, nil
}
/* ============================================================
- Polling workers
+ MQTT Publishing
============================================================ */
-func (app *App) runObservationPoller() {
- defer app.wg.Done()
+func createMQTTClient(cfg *Config) (mqtt.Client, error) {
+ opts := mqtt.NewClientOptions().
+ AddBroker(cfg.MQTTBroker).
+ SetClientID(fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())).
+ SetCleanSession(true)
- app.logger.Info("Starting observation poller",
- "poll_interval", app.config.ObsPollEvery,
- "station", app.config.StationID)
+ if cfg.MQTTUsername != "" {
+ opts.SetUsername(cfg.MQTTUsername)
+ if cfg.MQTTPassword != "" {
+ opts.SetPassword(cfg.MQTTPassword)
+ }
+ }
- ticker := time.NewTicker(app.config.ObsPollEvery)
- defer ticker.Stop()
+ client := mqtt.NewClient(opts)
+ token := client.Connect()
- for {
- select {
- case <-app.shutdownCtx.Done():
- app.logger.Info("Observation poller stopping")
- return
- case <-ticker.C:
- app.pollObservations()
- }
+ if !token.WaitTimeout(cfg.HTTPTimeout) {
+ return nil, fmt.Errorf("MQTT connection timeout")
+ }
+
+ if err := token.Error(); err != nil {
+ return nil, err
}
+
+ return client, nil
}
-func (app *App) pollObservations() {
- startTime := time.Now()
- lastObs := app.lastObsWatermark.Load().(time.Time)
- obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag)
+func publishObservation(client mqtt.Client, obs Observation, timeout time.Duration) error {
+ topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
- if !obsEnd.After(lastObs) {
- return // Nothing new to fetch
+ // Safe JSON structure
+ type SafeObservation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value *JSONFloat64 `json:"value,omitempty"`
}
- app.logger.Info("Polling observations",
- "start", lastObs,
- "end", obsEnd)
+ var safeValue *JSONFloat64
+ if obs.Value != nil {
+ v := JSONFloat64(*obs.Value)
+ safeValue = &v
+ }
- body, err := app.fetchObservations(lastObs, obsEnd)
- if err != nil {
- app.logger.Error("Failed to fetch observations", "error", err)
- return
+ safeObs := SafeObservation{
+ Station: obs.Station,
+ Parameter: obs.Parameter,
+ Time: obs.Time,
+ Value: safeValue,
}
- defer body.Close()
- obs, maxT, err := app.parseObservations(body, lastObs)
+ b, err := json.Marshal(safeObs)
if err != nil {
- app.logger.Error("Failed to parse observations", "error", err)
- return
+ return err
}
- // Set station ID for all observations
- for i := range obs {
- obs[i].Station = app.config.StationID
+ token := client.Publish(topic, 1, false, b)
+ if !token.WaitTimeout(timeout) {
+ return fmt.Errorf("publish timeout")
}
+ return token.Error()
+}
- if len(obs) > 0 {
- if err := app.publishObs(obs); err != nil {
- app.logger.Error("Failed to publish observations", "error", err)
- return
- }
-
- app.lastObsWatermark.Store(maxT)
- if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil {
- app.logger.Error("Failed to store watermark", "error", err)
- }
+func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration) error {
+ topic := fmt.Sprintf(
+ "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
+ fc.RunTime.Format(time.RFC3339),
+ fc.Parameter,
+ )
- app.logger.Info("Published observations",
- "count", len(obs),
- "new_watermark", maxT,
- "duration", time.Since(startTime))
- } else {
- app.logger.Debug("No new observations")
+ // Safe JSON structure
+ type SafeForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value *JSONFloat64 `json:"value,omitempty"`
}
-}
-func (app *App) runForecastPoller() {
- defer app.wg.Done()
+ var safeValue *JSONFloat64
+ if fc.Value != nil {
+ v := JSONFloat64(*fc.Value)
+ safeValue = &v
+ }
- app.logger.Info("Starting forecast poller",
- "poll_interval", app.config.FcPollEvery,
- "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon))
+ safeFc := SafeForecastValue{
+ Location: fc.Location,
+ Model: fc.Model,
+ RunTime: fc.RunTime,
+ ForecastTime: fc.ForecastTime,
+ Parameter: fc.Parameter,
+ Value: safeValue,
+ }
- ticker := time.NewTicker(app.config.FcPollEvery)
- defer ticker.Stop()
+ b, err := json.Marshal(safeFc)
+ if err != nil {
+ return err
+ }
- for {
- select {
- case <-app.shutdownCtx.Done():
- app.logger.Info("Forecast poller stopping")
- return
- case <-ticker.C:
- app.pollForecast()
- }
+ token := client.Publish(topic, 1, true, b)
+ if !token.WaitTimeout(timeout) {
+ return fmt.Errorf("publish timeout")
}
+ return token.Error()
}
-func (app *App) pollForecast() {
- startTime := time.Now()
- now := time.Now().UTC()
- start := now
- end := now.Add(48 * time.Hour)
+/* ============================================================
+ Single Poller
+ ============================================================ */
+
+func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
+ // Load initial state
+ lastObs := loadWatermark(cfg.WatermarkFile)
+ lastFcRun := time.Time{}
- app.logger.Info("Polling forecast",
- "start", start,
- "end", end)
+ // Create timers
+ obsTimer := time.NewTimer(0) // Start immediately
+ fcTimer := time.NewTimer(0) // Start immediately
- body, err := app.fetchForecast(start, end)
- if err != nil {
- app.logger.Error("Failed to fetch forecast", "error", err)
- return
- }
- defer body.Close()
+ defer obsTimer.Stop()
+ defer fcTimer.Stop()
- fc, runTime, err := app.parseForecast(body)
+ // Create MQTT client
+ mqttClient, err := createMQTTClient(cfg)
if err != nil {
- app.logger.Error("Failed to parse forecast", "error", err)
- return
+ return fmt.Errorf("failed to connect to MQTT: %w", err)
}
+ defer mqttClient.Disconnect(250)
- // Set location for all forecasts
- for i := range fc {
- fc[i].Location.Lat = app.config.HelLat
- fc[i].Location.Lon = app.config.HelLon
- }
+ logger.Info("Poller started",
+ "obs_interval", cfg.ObsPollEvery,
+ "fc_interval", cfg.FcPollEvery)
- if len(fc) > 0 {
- // Check if this is a new forecast run
- lastRun := app.lastForecastRun.Load().(time.Time)
- if runTime.After(lastRun) {
- if err := app.publishForecast(fc); err != nil {
- app.logger.Error("Failed to publish forecast", "error", err)
- return
- }
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Info("Poller stopping")
+ return nil
- app.lastForecastRun.Store(runTime)
- app.logger.Info("Published forecast",
- "count", len(fc),
- "run_time", runTime,
- "duration", time.Since(startTime))
- } else {
- app.logger.Debug("Forecast run already published", "run_time", runTime)
- }
- }
-}
+ case <-obsTimer.C:
+ // Poll observations
+ obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag)
+ if obsEnd.After(lastObs) {
+ logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
-/* ============================================================
- MQTT publishing with error handling
- ============================================================ */
+ url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
+ body, err := fetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch observations", "error", err)
+ } else {
+ obs, maxT, err := parseObservations(body, cfg.StationID, lastObs, logger)
+ body.Close()
+
+ if err != nil {
+ logger.Error("Failed to parse observations", "error", err)
+ } else if len(obs) > 0 {
+ // Publish observations
+ for _, o := range obs {
+ if err := publishObservation(mqttClient, o, cfg.HTTPTimeout); err != nil {
+ logger.Error("Failed to publish observation", "error", err)
+ }
+ }
+
+ // Update watermark
+ lastObs = maxT
+ if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil {
+ logger.Error("Failed to store watermark", "error", err)
+ }
+
+ logger.Info("Published observations", "count", len(obs), "new_watermark", lastObs)
+ }
+ }
+ }
+ obsTimer.Reset(cfg.ObsPollEvery)
-func (app *App) publishObs(obs []Observation) error {
- for _, o := range obs {
- topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter)
- b, err := json.Marshal(o)
- if err != nil {
- return fmt.Errorf("failed to marshal observation: %w", err)
- }
+ case <-fcTimer.C:
+ // Poll forecast
+ now := time.Now().UTC()
+ start := now
+ end := now.Add(48 * time.Hour)
- token := app.mqttClient.Publish(topic, 1, false, b)
- if !token.WaitTimeout(app.config.HTTPTimeout) {
- return errors.New("MQTT publish timeout")
- }
- if err := token.Error(); err != nil {
- return fmt.Errorf("failed to publish observation: %w", err)
- }
- }
- return nil
-}
+ logger.Info("Polling forecast", "from", start, "to", end)
-func (app *App) publishForecast(fc []ForecastValue) error {
- for _, v := range fc {
- topic := fmt.Sprintf(
- "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
- v.RunTime.Format(time.RFC3339),
- v.Parameter,
- )
- b, err := json.Marshal(v)
- if err != nil {
- return fmt.Errorf("failed to marshal forecast: %w", err)
- }
+ url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
+ body, err := fetchData(url, cfg.HTTPTimeout)
+ if err != nil {
+ logger.Error("Failed to fetch forecast", "error", err)
+ } else {
+ fc, runTime, err := parseForecast(body, cfg.HelLat, cfg.HelLon, logger)
+ body.Close()
- token := app.mqttClient.Publish(topic, 1, true, b)
- if !token.WaitTimeout(app.config.HTTPTimeout) {
- return errors.New("MQTT publish timeout")
- }
- if err := token.Error(); err != nil {
- return fmt.Errorf("failed to publish forecast: %w", err)
+ if err != nil {
+ logger.Error("Failed to parse forecast", "error", err)
+ } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
+ // Publish forecast
+ for _, f := range fc {
+ if err := publishForecast(mqttClient, f, cfg.HTTPTimeout); err != nil {
+ logger.Error("Failed to publish forecast", "error", err)
+ }
+ }
+
+ lastFcRun = runTime
+ logger.Info("Published forecast", "count", len(fc), "run_time", runTime)
+ }
+ }
+ fcTimer.Reset(cfg.FcPollEvery)
}
}
- return nil
-}
-
-/* ============================================================
- Utilities
- ============================================================ */
-
-func normalizeParameter(id string) string {
- return strings.TrimPrefix(strings.ToLower(id), "ts_")
}
/* ============================================================
@@ -719,51 +655,24 @@ func normalizeParameter(id string) string {
============================================================ */
func main() {
- // Create and initialize app
- app, err := NewApp()
- if err != nil {
- panic(fmt.Sprintf("Failed to initialize app: %v", err))
- }
- defer app.shutdownCancel()
+ // Load configuration
+ cfg := loadConfig()
+
+ // Setup logging
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: parseLogLevel(cfg.LogLevel),
+ }))
- // Setup signal handling
- signalChan := make(chan os.Signal, 1)
- signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+ // Create context for graceful shutdown
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- // Connect to MQTT
- if err := app.ConnectMQTT(); err != nil {
- app.logger.Error("Failed to connect to MQTT", "error", err)
+ // Run the poller
+ logger.Info("Starting weather data poller")
+ if err := runPoller(ctx, cfg, logger); err != nil {
+ logger.Error("Poller failed", "error", err)
os.Exit(1)
}
- defer app.mqttClient.Disconnect(250)
-
- // Start pollers
- app.wg.Add(2)
- go app.runObservationPoller()
- go app.runForecastPoller()
-
- app.logger.Info("Weather poller started")
-
- // Wait for shutdown signal
- select {
- case sig := <-signalChan:
- app.logger.Info("Received shutdown signal", "signal", sig)
- app.shutdownCancel()
- case <-app.shutdownCtx.Done():
- app.logger.Info("Shutdown initiated")
- }
- // Wait for graceful shutdown
- shutdownDone := make(chan struct{})
- go func() {
- app.wg.Wait()
- close(shutdownDone)
- }()
-
- select {
- case <-shutdownDone:
- app.logger.Info("Shutdown completed")
- case <-time.After(30 * time.Second):
- app.logger.Warn("Shutdown timed out")
- }
+ logger.Info("Poller stopped gracefully")
}