aboutsummaryrefslogtreecommitdiffstats
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go347
1 files changed, 230 insertions, 117 deletions
diff --git a/main.go b/main.go
index 4c4f302..a8d635c 100644
--- a/main.go
+++ b/main.go
@@ -1,7 +1,9 @@
package main
import (
+ "bytes"
"context"
+ "crypto/tls"
"encoding/json"
"encoding/xml"
"fmt"
@@ -15,14 +17,11 @@ import (
"strings"
"time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
"github.com/joho/godotenv"
)
-/* ============================================================
- Configuration
- ============================================================ */
-
type Config struct {
// FMI API
FMIEndpoint string `env:"FMI_ENDPOINT"`
@@ -40,20 +39,18 @@ type Config struct {
HelLon float64 `env:"HEL_LON"`
// MQTT
- MQTTBroker string `env:"MQTT_BROKER"`
- MQTTClientID string `env:"MQTT_CLIENT_ID"`
- MQTTUsername string `env:"MQTT_USERNAME"`
- MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
// Application
LogLevel string `env:"LOG_LEVEL"`
HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
}
-/* ============================================================
- Domain models
- ============================================================ */
-
type Observation struct {
Station int `json:"station"`
Parameter string `json:"parameter"`
@@ -74,10 +71,6 @@ type ForecastValue struct {
Value *float64 `json:"value,omitempty"`
}
-/* ============================================================
- JSON Float for NaN handling
- ============================================================ */
-
type JSONFloat64 float64
func (f JSONFloat64) MarshalJSON() ([]byte, error) {
@@ -87,30 +80,28 @@ func (f JSONFloat64) MarshalJSON() ([]byte, error) {
return json.Marshal(float64(f))
}
-/* ============================================================
- Load Configuration
- ============================================================ */
-
func loadConfig() *Config {
// Load .env file if it exists
_ = godotenv.Load()
cfg := &Config{
- FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
- StationID: parseInt(getEnv("STATION_ID", "100968")),
- ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")),
- ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")),
- ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")),
- WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"),
- FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
- HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
- HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
- MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
- MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"),
- MQTTUsername: getEnv("MQTT_USERNAME", ""),
- MQTTPassword: getEnv("MQTT_PASSWORD", ""),
- LogLevel: getEnv("LOG_LEVEL", "info"),
- HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
+ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: parseInt(getEnv("STATION_ID", "100968")),
+ ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
+ HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
+ HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
+ MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: getEnv("MQTT_USERNAME", ""),
+ MQTTPassword: getEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")),
+ LogLevel: getEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
}
return cfg
@@ -160,10 +151,6 @@ func parseLogLevel(level string) slog.Level {
}
}
-/* ============================================================
- Watermark persistence
- ============================================================ */
-
func loadWatermark(filename string) time.Time {
b, err := os.ReadFile(filename)
if err != nil {
@@ -181,11 +168,7 @@ func storeWatermark(filename string, t time.Time) error {
return os.WriteFile(filename, data, 0644)
}
-/* ============================================================
- HTTP Fetch
- ============================================================ */
-
-func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) {
+func fetchData(url string, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
@@ -199,19 +182,15 @@ func fetchData(url string, timeout time.Duration) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
+ defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
- resp.Body.Close()
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
}
- return resp.Body, nil
+ return io.ReadAll(resp.Body)
}
-/* ============================================================
- Observation Polling
- ============================================================ */
-
func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
q := url.Values{}
q.Set("service", "WFS")
@@ -226,8 +205,8 @@ func buildObservationsURL(endpoint string, stationID int, start, end time.Time,
return endpoint + "?" + q.Encode()
}
-func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) {
- dec := xml.NewDecoder(r)
+func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
var (
param string
@@ -316,10 +295,6 @@ func parseObservations(r io.Reader, stationID int, minTime time.Time, logger *sl
return out, maxObserved, nil
}
-/* ============================================================
- Forecast Polling
- ============================================================ */
-
func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
q := url.Values{}
q.Set("service", "WFS")
@@ -333,8 +308,8 @@ func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) s
return endpoint + "?" + q.Encode()
}
-func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) {
- dec := xml.NewDecoder(r)
+func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
var (
param string
@@ -432,38 +407,86 @@ func parseForecast(r io.Reader, lat, lon float64, logger *slog.Logger) ([]Foreca
return out, runTime, nil
}
-/* ============================================================
- MQTT Publishing
- ============================================================ */
+func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) {
+ serverURL, err := url.Parse(cfg.MQTTBroker)
+ if err != nil {
+ return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ // Client configuration
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()),
+ OnPublishReceived: []func(paho.PublishReceived) (bool, error){
+ func(pr paho.PublishReceived) (bool, error) {
+ logger.Debug("Received message", "topic", pr.Packet.Topic)
+ return true, nil
+ },
+ },
+ // Set OnDisconnect here
+ OnServerDisconnect: func(d *paho.Disconnect) {
+ if d != nil {
+ logger.Warn("MQTT disconnected", "reason", d.ReasonCode)
+ } else {
+ logger.Info("MQTT disconnected gracefully")
+ }
+ },
+ },
+ ConnectUsername: cfg.MQTTUsername,
+ ConnectPassword: []byte(cfg.MQTTPassword),
+ ConnectTimeout: cfg.HTTPTimeout,
+ }
-func createMQTTClient(cfg *Config) (mqtt.Client, error) {
- opts := mqtt.NewClientOptions().
- AddBroker(cfg.MQTTBroker).
- SetClientID(fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())).
- SetCleanSession(true)
+ // MQTT v5 Connect properties - these need to be pointers
+ cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds())
+ cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())
+ cliCfg.CleanStartOnInitialConnection = false
+
+ // Last Will and Testament (LWT)
+ cliCfg.WillMessage = &paho.WillMessage{
+ Topic: "weather/clients/status",
+ Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)),
+ QoS: 1,
+ Retain: true,
+ }
- if cfg.MQTTUsername != "" {
- opts.SetUsername(cfg.MQTTUsername)
- if cfg.MQTTPassword != "" {
- opts.SetPassword(cfg.MQTTPassword)
+ // TLS configuration (if using TLS)
+ if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
+ cliCfg.TlsCfg = &tls.Config{
+ MinVersion: tls.VersionTLS12,
}
}
- client := mqtt.NewClient(opts)
- token := client.Connect()
+ // Callbacks
+ cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ logger.Info("MQTT v5 connected",
+ "broker", serverURL.Host,
+ "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil,
+ "keep_alive", cfg.MQTTKeepAlive)
+ }
- if !token.WaitTimeout(cfg.HTTPTimeout) {
- return nil, fmt.Errorf("MQTT connection timeout")
+ cliCfg.OnConnectError = func(err error) {
+ logger.Error("MQTT connection error", "error", err)
}
- if err := token.Error(); err != nil {
- return nil, err
+ // Connect to broker
+ cm, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ // Wait for connection to be established
+ err = cm.AwaitConnection(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
}
- return client, nil
+ return cm, nil
}
-func publishObservation(client mqtt.Client, obs Observation, timeout time.Duration) error {
+func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error {
topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
// Safe JSON structure
@@ -492,14 +515,44 @@ func publishObservation(client mqtt.Client, obs Observation, timeout time.Durati
return err
}
- token := client.Publish(topic, 1, false, b)
- if !token.WaitTimeout(timeout) {
- return fmt.Errorf("publish timeout")
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)},
+ {Key: "parameter", Value: obs.Parameter},
+ {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)},
+ {Key: "data_type", Value: "observation"},
+ {Key: "source", Value: "fmi"},
+ },
+ }
+
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: false,
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish observation: %w", err)
}
- return token.Error()
+
+ logger.Debug("Published observation",
+ "topic", topic,
+ "station", obs.Station,
+ "parameter", obs.Parameter,
+ "time", obs.Time)
+
+ return nil
}
-func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration) error {
+func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error {
topic := fmt.Sprintf(
"weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
fc.RunTime.Format(time.RFC3339),
@@ -539,22 +592,66 @@ func publishForecast(client mqtt.Client, fc ForecastValue, timeout time.Duration
return err
}
- token := client.Publish(topic, 1, true, b)
- if !token.WaitTimeout(timeout) {
- return fmt.Errorf("publish timeout")
+ // MQTT v5 publish properties
+ props := &paho.PublishProperties{
+ User: []paho.UserProperty{
+ {Key: "model", Value: fc.Model},
+ {Key: "parameter", Value: fc.Parameter},
+ {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)},
+ {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)},
+ {Key: "data_type", Value: "forecast"},
+ {Key: "source", Value: "fmi"},
+ {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)},
+ {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)},
+ },
}
- return token.Error()
-}
-/* ============================================================
- Single Poller
- ============================================================ */
+ pb := &paho.Publish{
+ Topic: topic,
+ QoS: 1,
+ Retain: true, // Forecasts are retained since they don't change
+ Payload: b,
+ Properties: props,
+ }
+
+ // Publish with context timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err = cm.Publish(ctx, pb)
+ if err != nil {
+ return fmt.Errorf("failed to publish forecast: %w", err)
+ }
+
+ logger.Debug("Published forecast",
+ "topic", topic,
+ "parameter", fc.Parameter,
+ "run_time", fc.RunTime,
+ "forecast_time", fc.ForecastTime)
+
+ return nil
+}
func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
// Load initial state
lastObs := loadWatermark(cfg.WatermarkFile)
lastFcRun := time.Time{}
+ // Create MQTT client
+ mqttClient, err := createMQTTClient(ctx, cfg, logger)
+ if err != nil {
+ return fmt.Errorf("failed to connect to MQTT: %w", err)
+ }
+ defer func() {
+ // Graceful disconnect
+ discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := mqttClient.Disconnect(discCtx); err != nil {
+ logger.Error("Failed to disconnect MQTT", "error", err)
+ }
+ }()
+
// Create timers
obsTimer := time.NewTimer(0) // Start immediately
fcTimer := time.NewTimer(0) // Start immediately
@@ -562,16 +659,10 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
defer obsTimer.Stop()
defer fcTimer.Stop()
- // Create MQTT client
- mqttClient, err := createMQTTClient(cfg)
- if err != nil {
- return fmt.Errorf("failed to connect to MQTT: %w", err)
- }
- defer mqttClient.Disconnect(250)
-
logger.Info("Poller started",
"obs_interval", cfg.ObsPollEvery,
- "fc_interval", cfg.FcPollEvery)
+ "fc_interval", cfg.FcPollEvery,
+ "mqtt_broker", cfg.MQTTBroker)
for {
select {
@@ -586,20 +677,22 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
- body, err := fetchData(url, cfg.HTTPTimeout)
+ data, err := fetchData(url, cfg.HTTPTimeout)
if err != nil {
logger.Error("Failed to fetch observations", "error", err)
} else {
- obs, maxT, err := parseObservations(body, cfg.StationID, lastObs, logger)
- body.Close()
+ obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger)
if err != nil {
logger.Error("Failed to parse observations", "error", err)
} else if len(obs) > 0 {
// Publish observations
+ successCount := 0
for _, o := range obs {
- if err := publishObservation(mqttClient, o, cfg.HTTPTimeout); err != nil {
- logger.Error("Failed to publish observation", "error", err)
+ if err := publishObservation(mqttClient, o, logger); err != nil {
+ logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter)
+ } else {
+ successCount++
}
}
@@ -609,7 +702,12 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
logger.Error("Failed to store watermark", "error", err)
}
- logger.Info("Published observations", "count", len(obs), "new_watermark", lastObs)
+ logger.Info("Published observations",
+ "successful", successCount,
+ "total", len(obs),
+ "new_watermark", lastObs)
+ } else {
+ logger.Debug("No new observations found")
}
}
}
@@ -624,25 +722,32 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
logger.Info("Polling forecast", "from", start, "to", end)
url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
- body, err := fetchData(url, cfg.HTTPTimeout)
+ data, err := fetchData(url, cfg.HTTPTimeout)
if err != nil {
logger.Error("Failed to fetch forecast", "error", err)
} else {
- fc, runTime, err := parseForecast(body, cfg.HelLat, cfg.HelLon, logger)
- body.Close()
+ fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger)
if err != nil {
logger.Error("Failed to parse forecast", "error", err)
} else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
// Publish forecast
+ successCount := 0
for _, f := range fc {
- if err := publishForecast(mqttClient, f, cfg.HTTPTimeout); err != nil {
- logger.Error("Failed to publish forecast", "error", err)
+ if err := publishForecast(mqttClient, f, logger); err != nil {
+ logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter)
+ } else {
+ successCount++
}
}
lastFcRun = runTime
- logger.Info("Published forecast", "count", len(fc), "run_time", runTime)
+ logger.Info("Published forecast",
+ "successful", successCount,
+ "total", len(fc),
+ "run_time", runTime)
+ } else {
+ logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun)
}
}
fcTimer.Reset(cfg.FcPollEvery)
@@ -650,10 +755,6 @@ func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
}
}
-/* ============================================================
- Main
- ============================================================ */
-
func main() {
// Load configuration
cfg := loadConfig()
@@ -667,8 +768,20 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ // Setup signal handling for graceful shutdown
+ sigChan := make(chan os.Signal, 1)
+ // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ // For now, we don't handle signals in this simplified version
+ // In production, you'd want to handle SIGINT and SIGTERM
+ <-sigChan
+ logger.Info("Received shutdown signal")
+ cancel()
+ }()
+
// Run the poller
- logger.Info("Starting weather data poller")
+ logger.Info("Starting weather data poller with MQTT v5")
if err := runPoller(ctx, cfg, logger); err != nil {
logger.Error("Poller failed", "error", err)
os.Exit(1)