aboutsummaryrefslogtreecommitdiffstats
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go792
1 files changed, 0 insertions, 792 deletions
diff --git a/main.go b/main.go
deleted file mode 100644
index c512022..0000000
--- a/main.go
+++ /dev/null
@@ -1,792 +0,0 @@
-package main
-
-import (
- "bytes"
- "context"
- "crypto/tls"
- "encoding/json"
- "encoding/xml"
- "fmt"
- "io"
- "log/slog"
- "math"
- "net/http"
- "net/url"
- "os"
- "strconv"
- "strings"
- "time"
-
- "github.com/eclipse/paho.golang/autopaho"
- "github.com/eclipse/paho.golang/paho"
- "github.com/joho/godotenv"
-)
-
-type Config struct {
- // FMI API
- FMIEndpoint string `env:"FMI_ENDPOINT"`
-
- // Observation
- StationID int `env:"STATION_ID"`
- ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
- ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
- ObsTimestep int `env:"OBS_TIMESTEP"`
- WatermarkFile string `env:"WATERMARK_FILE"`
-
- // Forecast
- FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
- HelLat float64 `env:"HEL_LAT"`
- HelLon float64 `env:"HEL_LON"`
-
- // MQTT
- MQTTBroker string `env:"MQTT_BROKER"`
- MQTTClientID string `env:"MQTT_CLIENT_ID"`
- MQTTUsername string `env:"MQTT_USERNAME"`
- MQTTPassword string `env:"MQTT_PASSWORD"`
- MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
- MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
-
- // Application
- LogLevel string `env:"LOG_LEVEL"`
- HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
-}
-
-type Observation struct {
- Station int `json:"station"`
- Parameter string `json:"parameter"`
- Time time.Time `json:"time"`
- Value *float64 `json:"value,omitempty"`
-}
-
-type ForecastValue struct {
- Location struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- } `json:"location"`
-
- Model string `json:"model"`
- RunTime time.Time `json:"run_time"`
- ForecastTime time.Time `json:"forecast_time"`
- Parameter string `json:"parameter"`
- Value *float64 `json:"value,omitempty"`
-}
-
-type JSONFloat64 float64
-
-func (f JSONFloat64) MarshalJSON() ([]byte, error) {
- if math.IsNaN(float64(f)) {
- return []byte("null"), nil
- }
- return json.Marshal(float64(f))
-}
-
-func loadConfig() *Config {
- // Load .env file if it exists
- _ = godotenv.Load()
-
- cfg := &Config{
- FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
- StationID: parseInt(getEnv("STATION_ID", "100968")),
- ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")),
- ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")),
- ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")),
- WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"),
- FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
- HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
- HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
- MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
- MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"),
- MQTTUsername: getEnv("MQTT_USERNAME", ""),
- MQTTPassword: getEnv("MQTT_PASSWORD", ""),
- MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")),
- MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")),
- LogLevel: getEnv("LOG_LEVEL", "debug"),
- HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
- }
-
- return cfg
-}
-
-func getEnv(key, defaultValue string) string {
- if value := os.Getenv(key); value != "" {
- return value
- }
- return defaultValue
-}
-
-func parseDuration(s string) time.Duration {
- d, err := time.ParseDuration(s)
- if err != nil {
- return 5 * time.Minute // Default fallback
- }
- return d
-}
-
-func parseInt(s string) int {
- i, err := strconv.Atoi(s)
- if err != nil {
- return 0
- }
- return i
-}
-
-func parseFloat(s string) float64 {
- f, err := strconv.ParseFloat(s, 64)
- if err != nil {
- return 0
- }
- return f
-}
-
-func parseLogLevel(level string) slog.Level {
- switch strings.ToLower(level) {
- case "debug":
- return slog.LevelDebug
- case "warn", "warning":
- return slog.LevelWarn
- case "error":
- return slog.LevelError
- default:
- return slog.LevelInfo
- }
-}
-
-func loadWatermark(filename string) time.Time {
- b, err := os.ReadFile(filename)
- if err != nil {
- return time.Now().UTC().Add(-24 * time.Hour)
- }
- sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
- if err != nil {
- return time.Now().UTC().Add(-24 * time.Hour)
- }
- return time.Unix(sec, 0).UTC()
-}
-
-func storeWatermark(filename string, t time.Time) error {
- data := []byte(fmt.Sprintf("%d", t.Unix()))
- return os.WriteFile(filename, data, 0644)
-}
-
-func fetchData(url string, timeout time.Duration) ([]byte, error) {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
-
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
- if err != nil {
- return nil, err
- }
- req.Header.Set("Accept", "application/xml")
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
- }
-
- return io.ReadAll(resp.Body)
-}
-
-func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
- q := url.Values{}
- q.Set("service", "WFS")
- q.Set("version", "2.0.0")
- q.Set("request", "GetFeature")
- q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
- q.Set("fmisid", fmt.Sprintf("%d", stationID))
- q.Set("starttime", start.Format(time.RFC3339))
- q.Set("endtime", end.Format(time.RFC3339))
- q.Set("timestep", fmt.Sprintf("%d", timestep))
-
- return endpoint + "?" + q.Encode()
-}
-
-func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) {
- dec := xml.NewDecoder(bytes.NewReader(data))
-
- var (
- param string
- ts time.Time
- maxObserved = minTime
- out []Observation
- inTVP bool
- inTime bool
- inValue bool
- )
-
- for {
- tok, err := dec.Token()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, maxObserved, err
- }
-
- switch el := tok.(type) {
- case xml.StartElement:
- switch el.Name.Local {
- case "MeasurementTimeseries":
- for _, a := range el.Attr {
- if a.Name.Local == "id" {
- parts := strings.Split(a.Value, "-")
- if len(parts) > 0 {
- param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
- }
- }
- }
- case "MeasurementTVP":
- inTVP = true
- ts = time.Time{}
- case "time":
- if inTVP {
- inTime = true
- }
- case "value":
- if inTVP {
- inValue = true
- }
- }
-
- case xml.CharData:
- if inTime && inTVP {
- s := strings.TrimSpace(string(el))
- if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
- ts = parsedTime
- }
- } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) {
- s := strings.TrimSpace(string(el))
- var valuePtr *float64
-
- if !strings.EqualFold(s, "NaN") && s != "" {
- if value, err := strconv.ParseFloat(s, 64); err == nil {
- valuePtr = &value
- }
- }
-
- out = append(out, Observation{
- Station: stationID,
- Parameter: param,
- Time: ts,
- Value: valuePtr,
- })
-
- if ts.After(maxObserved) {
- maxObserved = ts
- }
- }
-
- case xml.EndElement:
- switch el.Name.Local {
- case "MeasurementTVP":
- inTVP = false
- case "time":
- inTime = false
- case "value":
- inValue = false
- }
- }
- }
-
- return out, maxObserved, nil
-}
-
-func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
- q := url.Values{}
- q.Set("service", "WFS")
- q.Set("version", "2.0.0")
- q.Set("request", "GetFeature")
- q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
- q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon))
- q.Set("starttime", start.Format(time.RFC3339))
- q.Set("endtime", end.Format(time.RFC3339))
-
- return endpoint + "?" + q.Encode()
-}
-
-func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) {
- dec := xml.NewDecoder(bytes.NewReader(data))
-
- var (
- param string
- runTime time.Time
- fcTime time.Time
- out []ForecastValue
- inTVP bool
- inTime bool
- inValue bool
- )
-
- for {
- tok, err := dec.Token()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, runTime, err
- }
-
- switch el := tok.(type) {
- case xml.StartElement:
- switch el.Name.Local {
- case "resultTime":
- var s string
- if err := dec.DecodeElement(&s, &el); err == nil {
- if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil {
- runTime = parsedTime
- }
- }
- case "MeasurementTimeseries":
- for _, a := range el.Attr {
- if a.Name.Local == "id" {
- parts := strings.Split(a.Value, "-")
- if len(parts) > 0 {
- param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
- }
- }
- }
- case "MeasurementTVP":
- inTVP = true
- fcTime = time.Time{}
- case "time":
- if inTVP {
- inTime = true
- }
- case "value":
- if inTVP {
- inValue = true
- }
- }
-
- case xml.CharData:
- if inTime && inTVP {
- s := strings.TrimSpace(string(el))
- if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
- fcTime = parsedTime
- }
- } else if inValue && inTVP && !fcTime.IsZero() {
- s := strings.TrimSpace(string(el))
- var valuePtr *float64
-
- if !strings.EqualFold(s, "NaN") && s != "" {
- if value, err := strconv.ParseFloat(s, 64); err == nil {
- valuePtr = &value
- }
- }
-
- fc := ForecastValue{
- Location: struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- }{lat, lon},
- Model: "harmonie",
- RunTime: runTime,
- ForecastTime: fcTime,
- Parameter: param,
- Value: valuePtr,
- }
- out = append(out, fc)
- }
-
- case xml.EndElement:
- switch el.Name.Local {
- case "MeasurementTVP":
- inTVP = false
- case "time":
- inTime = false
- case "value":
- inValue = false
- }
- }
- }
-
- return out, runTime, nil
-}
-
-func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) {
- serverURL, err := url.Parse(cfg.MQTTBroker)
- if err != nil {
- return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
- }
-
- // Client configuration
- cliCfg := autopaho.ClientConfig{
- ServerUrls: []*url.URL{serverURL},
- KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()),
- ClientConfig: paho.ClientConfig{
- ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()),
- OnPublishReceived: []func(paho.PublishReceived) (bool, error){
- func(pr paho.PublishReceived) (bool, error) {
- logger.Debug("Received message", "topic", pr.Packet.Topic)
- return true, nil
- },
- },
- // Set OnDisconnect here
- OnServerDisconnect: func(d *paho.Disconnect) {
- if d != nil {
- logger.Warn("MQTT disconnected", "reason", d.ReasonCode)
- } else {
- logger.Info("MQTT disconnected gracefully")
- }
- },
- },
- ConnectUsername: cfg.MQTTUsername,
- ConnectPassword: []byte(cfg.MQTTPassword),
- ConnectTimeout: cfg.HTTPTimeout,
- }
-
- // MQTT v5 Connect properties - these need to be pointers
- cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds())
- cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid())
- cliCfg.CleanStartOnInitialConnection = false
-
- // Last Will and Testament (LWT)
- cliCfg.WillMessage = &paho.WillMessage{
- Topic: "weather/clients/status",
- Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)),
- QoS: 1,
- Retain: true,
- }
-
- // TLS configuration (if using TLS)
- if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
- cliCfg.TlsCfg = &tls.Config{
- MinVersion: tls.VersionTLS12,
- }
- }
-
- // Callbacks
- cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
- logger.Info("MQTT v5 connected",
- "broker", serverURL.Host,
- "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil,
- "keep_alive", cfg.MQTTKeepAlive)
- }
-
- cliCfg.OnConnectError = func(err error) {
- logger.Error("MQTT connection error", "error", err)
- }
-
- // Connect to broker
- cm, err := autopaho.NewConnection(ctx, cliCfg)
- if err != nil {
- return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
- }
-
- // Wait for connection to be established
- err = cm.AwaitConnection(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
- }
-
- return cm, nil
-}
-
-func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error {
- topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter)
-
- // Safe JSON structure
- type SafeObservation struct {
- Station int `json:"station"`
- Parameter string `json:"parameter"`
- Time time.Time `json:"time"`
- Value *JSONFloat64 `json:"value,omitempty"`
- }
-
- var safeValue *JSONFloat64
- if obs.Value != nil {
- v := JSONFloat64(*obs.Value)
- safeValue = &v
- }
-
- safeObs := SafeObservation{
- Station: obs.Station,
- Parameter: obs.Parameter,
- Time: obs.Time,
- Value: safeValue,
- }
-
- b, err := json.Marshal(safeObs)
- if err != nil {
- return err
- }
-
- // MQTT v5 publish properties
- props := &paho.PublishProperties{
- User: []paho.UserProperty{
- {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)},
- {Key: "parameter", Value: obs.Parameter},
- {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)},
- {Key: "data_type", Value: "observation"},
- {Key: "source", Value: "fmi"},
- },
- }
-
- pb := &paho.Publish{
- Topic: topic,
- QoS: 1,
- Retain: false,
- Payload: b,
- Properties: props,
- }
-
- // Publish with context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- _, err = cm.Publish(ctx, pb)
- if err != nil {
- return fmt.Errorf("failed to publish observation: %w", err)
- }
-
- logger.Debug("Published observation",
- "topic", topic,
- "station", obs.Station,
- "parameter", obs.Parameter,
- "value", obs.Value,
- "time", obs.Time)
-
- return nil
-}
-
-func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error {
- topic := fmt.Sprintf(
- "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
- fc.RunTime.Format(time.RFC3339),
- fc.Parameter,
- )
-
- // Safe JSON structure
- type SafeForecastValue struct {
- Location struct {
- Lat float64 `json:"lat"`
- Lon float64 `json:"lon"`
- } `json:"location"`
- Model string `json:"model"`
- RunTime time.Time `json:"run_time"`
- ForecastTime time.Time `json:"forecast_time"`
- Parameter string `json:"parameter"`
- Value *JSONFloat64 `json:"value,omitempty"`
- }
-
- var safeValue *JSONFloat64
- if fc.Value != nil {
- v := JSONFloat64(*fc.Value)
- safeValue = &v
- }
-
- safeFc := SafeForecastValue{
- Location: fc.Location,
- Model: fc.Model,
- RunTime: fc.RunTime,
- ForecastTime: fc.ForecastTime,
- Parameter: fc.Parameter,
- Value: safeValue,
- }
-
- b, err := json.Marshal(safeFc)
- if err != nil {
- return err
- }
-
- // MQTT v5 publish properties
- props := &paho.PublishProperties{
- User: []paho.UserProperty{
- {Key: "model", Value: fc.Model},
- {Key: "parameter", Value: fc.Parameter},
- {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)},
- {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)},
- {Key: "data_type", Value: "forecast"},
- {Key: "source", Value: "fmi"},
- {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)},
- {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)},
- },
- }
-
- pb := &paho.Publish{
- Topic: topic,
- QoS: 1,
- Retain: true, // Forecasts are retained since they don't change
- Payload: b,
- Properties: props,
- }
-
- // Publish with context timeout
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- _, err = cm.Publish(ctx, pb)
- if err != nil {
- return fmt.Errorf("failed to publish forecast: %w", err)
- }
-
- logger.Debug("Published forecast",
- "topic", topic,
- "parameter", fc.Parameter,
- "run_time", fc.RunTime,
- "forecast_time", fc.ForecastTime)
-
- return nil
-}
-
-func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error {
- // Load initial state
- lastObs := loadWatermark(cfg.WatermarkFile)
- lastFcRun := time.Time{}
-
- // Create MQTT client
- mqttClient, err := createMQTTClient(ctx, cfg, logger)
- if err != nil {
- return fmt.Errorf("failed to connect to MQTT: %w", err)
- }
- defer func() {
- // Graceful disconnect
- discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- if err := mqttClient.Disconnect(discCtx); err != nil {
- logger.Error("Failed to disconnect MQTT", "error", err)
- }
- }()
-
- // Create timers
- obsTimer := time.NewTimer(0) // Start immediately
- fcTimer := time.NewTimer(0) // Start immediately
-
- defer obsTimer.Stop()
- defer fcTimer.Stop()
-
- logger.Info("Poller started",
- "obs_interval", cfg.ObsPollEvery,
- "fc_interval", cfg.FcPollEvery,
- "mqtt_broker", cfg.MQTTBroker)
-
- for {
- select {
- case <-ctx.Done():
- logger.Info("Poller stopping")
- return nil
-
- case <-obsTimer.C:
- // Poll observations
- obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag)
- if obsEnd.After(lastObs) {
- logger.Info("Polling observations", "from", lastObs, "to", obsEnd)
-
- url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep)
- data, err := fetchData(url, cfg.HTTPTimeout)
- if err != nil {
- logger.Error("Failed to fetch observations", "error", err)
- } else {
- obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger)
-
- if err != nil {
- logger.Error("Failed to parse observations", "error", err)
- } else if len(obs) > 0 {
- // Publish observations
- successCount := 0
- for _, o := range obs {
- if err := publishObservation(mqttClient, o, logger); err != nil {
- logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter)
- } else {
- successCount++
- }
- }
-
- // Update watermark
- lastObs = maxT
- if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil {
- logger.Error("Failed to store watermark", "error", err)
- }
-
- logger.Info("Published observations",
- "successful", successCount,
- "total", len(obs),
- "new_watermark", lastObs)
- } else {
- logger.Debug("No new observations found")
- }
- }
- }
- obsTimer.Reset(cfg.ObsPollEvery)
-
- case <-fcTimer.C:
- // Poll forecast
- now := time.Now().UTC()
- start := now
- end := now.Add(48 * time.Hour)
-
- logger.Info("Polling forecast", "from", start, "to", end)
-
- url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end)
- data, err := fetchData(url, cfg.HTTPTimeout)
- if err != nil {
- logger.Error("Failed to fetch forecast", "error", err)
- } else {
- fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger)
-
- if err != nil {
- logger.Error("Failed to parse forecast", "error", err)
- } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) {
- // Publish forecast
- successCount := 0
- for _, f := range fc {
- if err := publishForecast(mqttClient, f, logger); err != nil {
- logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter)
- } else {
- successCount++
- }
- }
-
- lastFcRun = runTime
- logger.Info("Published forecast",
- "successful", successCount,
- "total", len(fc),
- "run_time", runTime)
- } else {
- logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun)
- }
- }
- fcTimer.Reset(cfg.FcPollEvery)
- }
- }
-}
-
-func main() {
- // Load configuration
- cfg := loadConfig()
-
- // Setup logging
- logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
- Level: parseLogLevel(cfg.LogLevel),
- }))
-
- // Create context for graceful shutdown
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Setup signal handling for graceful shutdown
- sigChan := make(chan os.Signal, 1)
- // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
-
- go func() {
- // For now, we don't handle signals in this simplified version
- // In production, you'd want to handle SIGINT and SIGTERM
- <-sigChan
- logger.Info("Received shutdown signal")
- cancel()
- }()
-
- // Run the poller
- logger.Info("Starting weather data poller with MQTT v5")
- if err := runPoller(ctx, cfg, logger); err != nil {
- logger.Error("Poller failed", "error", err)
- os.Exit(1)
- }
-
- logger.Info("Poller stopped gracefully")
-}