From 11392621501d7b5206bc521d6d2a50e2c4e36c45 Mon Sep 17 00:00:00 2001 From: Petri Hienonen Date: Fri, 2 Jan 2026 11:39:29 +0200 Subject: Initial commit --- .gitignore | 1 + flake.lock | 27 +++ flake.nix | 28 +++ go.mod | 11 + go.sum | 8 + hub | Bin 0 -> 9569432 bytes hub.service | 18 ++ main.go | 769 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 862 insertions(+) create mode 100644 .gitignore create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 go.mod create mode 100644 go.sum create mode 100755 hub create mode 100644 hub.service create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..122a5d7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +hub diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..9006f49 --- /dev/null +++ b/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1767339859, + "narHash": "sha256-VWG0PycumFfz1N2nGqBiDFrYPDDAUQWal/lsVvG53eE=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "82d8d0143127fcbe23013b68fd6f05fdd8fd38b8", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "release-25.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..c71dbdf --- /dev/null +++ b/flake.nix @@ -0,0 +1,28 @@ +{ + description = "Hub"; + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/release-25.11"; + }; + + outputs = + { self, nixpkgs }: + let + supportedSystems = [ + "x86_64-linux" + "aarch64-linux" + ]; + forAllSystems = nixpkgs.lib.genAttrs supportedSystems; + nixpkgsFor = forAllSystems (system: import nixpkgs { inherit system; }); + in + { + devShells = forAllSystems ( + system: + let + pkgs = nixpkgsFor.${system}; + in + { + default = pkgs.mkShell { buildInputs = with pkgs; [ go ]; }; + } + ); + }; +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f83dc31 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module hub + +go 1.25.5 + +require github.com/eclipse/paho.mqtt.golang v1.5.1 + +require ( + github.com/gorilla/websocket v1.5.3 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9abe94c --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= diff --git a/hub b/hub new file mode 100755 index 0000000..1a7fb2a Binary files /dev/null and b/hub differ diff --git a/hub.service b/hub.service new file mode 100644 index 0000000..34eb743 --- /dev/null +++ b/hub.service @@ -0,0 +1,18 @@ +# /etc/systemd/system/atom-publisher.service +[Unit] +Description=Data Publication HUB +After=network.target +StartLimitIntervalSec=10 + +[Service] +DynamicUser=true +Type=simple +Restart=always +RestartSec=10 +User=publisher +ExecStart=/usr/local/bin/hub +MemoryHigh=20M +MemoryMax=30M + +[Install] +WantedBy=multi-user.target diff --git a/main.go b/main.go new file mode 100644 index 0000000..f9afd40 --- /dev/null +++ b/main.go @@ -0,0 +1,769 @@ +package main + +import ( + "context" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +/* ============================================================ + Configuration (now configurable) + ============================================================ */ + +type Config struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"` + + // Observation + StationID int `env:"STATION_ID" default:"100968"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"` + ObsTimestep int `env:"OBS_TIMESTEP" default:"60"` + WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"` + HelLat float64 `env:"HEL_LAT" default:"60.1699"` + HelLon float64 `env:"HEL_LON" default:"24.9384"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER" default:"tcp://localhost:1883"` + MQTTClientID string `env:"MQTT_CLIENT_ID" default:"fmi-poller"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + + // Application + LogLevel string `env:"LOG_LEVEL" default:"info"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"` + MaxRetries int `env:"MAX_RETRIES" default:"3"` + RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"` +} + +/* ============================================================ + Application State + ============================================================ */ + +type App struct { + config *Config + logger *slog.Logger + mqttClient mqtt.Client + httpClient *http.Client + + // State + lastObsWatermark atomic.Value // stores time.Time + lastForecastRun atomic.Value // stores time.Time + shutdownCtx context.Context + shutdownCancel context.CancelFunc + + // Wait group for graceful shutdown + wg sync.WaitGroup +} + +/* ============================================================ + Domain models + ============================================================ */ + +type Observation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value float64 `json:"value"` +} + +type ForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value float64 `json:"value"` +} + +/* ============================================================ + Initialization + ============================================================ */ + +func NewApp() (*App, error) { + config := loadConfig() + + // Setup logging + logLevel := parseLogLevel(config.LogLevel) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })) + + // Create HTTP client with timeout + httpClient := &http.Client{ + Timeout: config.HTTPTimeout, + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: false, + }, + } + + // Create MQTT client + mqttOpts := mqtt.NewClientOptions(). + AddBroker(config.MQTTBroker). + SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())). + SetCleanSession(true). + SetAutoReconnect(true). + SetMaxReconnectInterval(10 * time.Second). + SetConnectionLostHandler(func(c mqtt.Client, err error) { + logger.Error("MQTT connection lost", "error", err) + }). + SetOnConnectHandler(func(c mqtt.Client) { + logger.Info("MQTT connected successfully") + }) + + if config.MQTTUsername != "" { + mqttOpts.SetUsername(config.MQTTUsername) + if config.MQTTPassword != "" { + mqttOpts.SetPassword(config.MQTTPassword) + } + } + + mqttClient := mqtt.NewClient(mqttOpts) + + ctx, cancel := context.WithCancel(context.Background()) + + app := &App{ + config: config, + logger: logger, + httpClient: httpClient, + mqttClient: mqttClient, + shutdownCtx: ctx, + shutdownCancel: cancel, + } + + // Initialize atomic values + app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile)) + app.lastForecastRun.Store(time.Time{}) + + return app, nil +} + +func (app *App) ConnectMQTT() error { + token := app.mqttClient.Connect() + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT connection timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("MQTT connection failed: %w", err) + } + app.logger.Info("Connected to MQTT broker") + return nil +} + +/* ============================================================ + Configuration loading + ============================================================ */ + +func loadConfig() *Config { + cfg := &Config{ + FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: parseInt(getEnv("STATION_ID", "100968")), + ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), + WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), + HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), + HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), + MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-poller"), + MQTTUsername: getEnv("MQTT_USERNAME", ""), + MQTTPassword: getEnv("MQTT_PASSWORD", ""), + LogLevel: getEnv("LOG_LEVEL", "info"), + HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), + MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")), + RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")), + } + return cfg +} + +func getEnv(key, defaultValue string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return defaultValue +} + +func parseDuration(s string) time.Duration { + d, err := time.ParseDuration(s) + if err != nil { + panic(fmt.Sprintf("invalid duration %s: %v", s, err)) + } + return d +} + +func parseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + panic(fmt.Sprintf("invalid int %s: %v", s, err)) + } + return i +} + +func parseFloat(s string) float64 { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + panic(fmt.Sprintf("invalid float %s: %v", s, err)) + } + return f +} + +func parseLogLevel(level string) slog.Level { + switch strings.ToLower(level) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} + +/* ============================================================ + Watermark persistence + ============================================================ */ + +func loadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + // If file doesn't exist, start from 24 hours ago + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +func storeWatermark(filename string, t time.Time) error { + data := []byte(fmt.Sprintf("%d", t.Unix())) + return os.WriteFile(filename, data, 0644) +} + +/* ============================================================ + Retry helper + ============================================================ */ + +func withRetry(app *App, name string, maxRetries int, fn func() error) error { + var lastErr error + for i := 0; i < maxRetries; i++ { + if err := fn(); err != nil { + lastErr = err + app.logger.Warn("Operation failed, retrying", + "operation", name, + "attempt", i+1, + "max_attempts", maxRetries, + "error", err) + + if i < maxRetries-1 { + select { + case <-time.After(app.config.RetryDelay): + continue + case <-app.shutdownCtx.Done(): + return app.shutdownCtx.Err() + } + } + } else { + return nil + } + } + return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr) +} + +/* ============================================================ + HTTP helpers with retry + ============================================================ */ + +func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) { + var body io.ReadCloser + err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error { + req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil) + if err != nil { + return err + } + req.Header.Set("Accept", "application/xml") + req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0") + + resp, err := app.httpClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + body = resp.Body + return nil + }) + + if err != nil { + return nil, err + } + return body, nil +} + +/* ============================================================ + Observation polling + ============================================================ */ + +func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") + q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep)) + + url := app.config.FMIEndpoint + "?" + q.Encode() + app.logger.Debug("Fetching observations", "url", url) + return app.fetchWithRetry(url) +} + +func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(r) + + var ( + param string + ts time.Time + maxObserved = minTime + out []Observation + parseErr error + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, maxObserved, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + param = normalizeParameter(a.Value) + } + } + case "time": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode time: %w", err) + continue + } + ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse time %s: %w", s, err) + continue + } + case "value": + var v float64 + if err := dec.DecodeElement(&v, &el); err != nil { + parseErr = fmt.Errorf("failed to decode value: %w", err) + continue + } + if ts.After(minTime) { + out = append(out, Observation{ + Station: -1, // Will be set by caller + Parameter: param, + Time: ts, + Value: v, + }) + if ts.After(maxObserved) { + maxObserved = ts + } + } + } + } + } + + if parseErr != nil { + app.logger.Warn("Partial parse errors", "error", parseErr) + } + + return out, maxObserved, nil +} + +/* ============================================================ + Forecast polling + ============================================================ */ + +func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + + url := app.config.FMIEndpoint + "?" + q.Encode() + app.logger.Debug("Fetching forecast", "url", url) + return app.fetchWithRetry(url) +} + +func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(r) + + var ( + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + parseErr error + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, runTime, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "resultTime": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode resultTime: %w", err) + continue + } + runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err) + continue + } + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + param = normalizeParameter(a.Value) + } + } + case "time": + var s string + if err := dec.DecodeElement(&s, &el); err != nil { + parseErr = fmt.Errorf("failed to decode forecast time: %w", err) + continue + } + fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s)) + if err != nil { + parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err) + continue + } + case "value": + var v float64 + if err := dec.DecodeElement(&v, &el); err != nil { + parseErr = fmt.Errorf("failed to decode forecast value: %w", err) + continue + } + out = append(out, ForecastValue{ + Model: "harmonie", + RunTime: runTime, + ForecastTime: fcTime, + Parameter: param, + Value: v, + }) + } + } + } + + if parseErr != nil { + app.logger.Warn("Partial parse errors in forecast", "error", parseErr) + } + + return out, runTime, nil +} + +/* ============================================================ + Polling workers + ============================================================ */ + +func (app *App) runObservationPoller() { + defer app.wg.Done() + + app.logger.Info("Starting observation poller", + "poll_interval", app.config.ObsPollEvery, + "station", app.config.StationID) + + ticker := time.NewTicker(app.config.ObsPollEvery) + defer ticker.Stop() + + for { + select { + case <-app.shutdownCtx.Done(): + app.logger.Info("Observation poller stopping") + return + case <-ticker.C: + app.pollObservations() + } + } +} + +func (app *App) pollObservations() { + startTime := time.Now() + lastObs := app.lastObsWatermark.Load().(time.Time) + obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag) + + if !obsEnd.After(lastObs) { + return // Nothing new to fetch + } + + app.logger.Info("Polling observations", + "start", lastObs, + "end", obsEnd) + + body, err := app.fetchObservations(lastObs, obsEnd) + if err != nil { + app.logger.Error("Failed to fetch observations", "error", err) + return + } + defer body.Close() + + obs, maxT, err := app.parseObservations(body, lastObs) + if err != nil { + app.logger.Error("Failed to parse observations", "error", err) + return + } + + // Set station ID for all observations + for i := range obs { + obs[i].Station = app.config.StationID + } + + if len(obs) > 0 { + if err := app.publishObs(obs); err != nil { + app.logger.Error("Failed to publish observations", "error", err) + return + } + + app.lastObsWatermark.Store(maxT) + if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil { + app.logger.Error("Failed to store watermark", "error", err) + } + + app.logger.Info("Published observations", + "count", len(obs), + "new_watermark", maxT, + "duration", time.Since(startTime)) + } else { + app.logger.Debug("No new observations") + } +} + +func (app *App) runForecastPoller() { + defer app.wg.Done() + + app.logger.Info("Starting forecast poller", + "poll_interval", app.config.FcPollEvery, + "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon)) + + ticker := time.NewTicker(app.config.FcPollEvery) + defer ticker.Stop() + + for { + select { + case <-app.shutdownCtx.Done(): + app.logger.Info("Forecast poller stopping") + return + case <-ticker.C: + app.pollForecast() + } + } +} + +func (app *App) pollForecast() { + startTime := time.Now() + now := time.Now().UTC() + start := now + end := now.Add(48 * time.Hour) + + app.logger.Info("Polling forecast", + "start", start, + "end", end) + + body, err := app.fetchForecast(start, end) + if err != nil { + app.logger.Error("Failed to fetch forecast", "error", err) + return + } + defer body.Close() + + fc, runTime, err := app.parseForecast(body) + if err != nil { + app.logger.Error("Failed to parse forecast", "error", err) + return + } + + // Set location for all forecasts + for i := range fc { + fc[i].Location.Lat = app.config.HelLat + fc[i].Location.Lon = app.config.HelLon + } + + if len(fc) > 0 { + // Check if this is a new forecast run + lastRun := app.lastForecastRun.Load().(time.Time) + if runTime.After(lastRun) { + if err := app.publishForecast(fc); err != nil { + app.logger.Error("Failed to publish forecast", "error", err) + return + } + + app.lastForecastRun.Store(runTime) + app.logger.Info("Published forecast", + "count", len(fc), + "run_time", runTime, + "duration", time.Since(startTime)) + } else { + app.logger.Debug("Forecast run already published", "run_time", runTime) + } + } +} + +/* ============================================================ + MQTT publishing with error handling + ============================================================ */ + +func (app *App) publishObs(obs []Observation) error { + for _, o := range obs { + topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter) + b, err := json.Marshal(o) + if err != nil { + return fmt.Errorf("failed to marshal observation: %w", err) + } + + token := app.mqttClient.Publish(topic, 1, false, b) + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT publish timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("failed to publish observation: %w", err) + } + } + return nil +} + +func (app *App) publishForecast(fc []ForecastValue) error { + for _, v := range fc { + topic := fmt.Sprintf( + "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", + v.RunTime.Format(time.RFC3339), + v.Parameter, + ) + b, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal forecast: %w", err) + } + + token := app.mqttClient.Publish(topic, 1, true, b) + if !token.WaitTimeout(app.config.HTTPTimeout) { + return errors.New("MQTT publish timeout") + } + if err := token.Error(); err != nil { + return fmt.Errorf("failed to publish forecast: %w", err) + } + } + return nil +} + +/* ============================================================ + Utilities + ============================================================ */ + +func normalizeParameter(id string) string { + return strings.TrimPrefix(strings.ToLower(id), "ts_") +} + +/* ============================================================ + Main + ============================================================ */ + +func main() { + // Create and initialize app + app, err := NewApp() + if err != nil { + panic(fmt.Sprintf("Failed to initialize app: %v", err)) + } + defer app.shutdownCancel() + + // Setup signal handling + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + // Connect to MQTT + if err := app.ConnectMQTT(); err != nil { + app.logger.Error("Failed to connect to MQTT", "error", err) + os.Exit(1) + } + defer app.mqttClient.Disconnect(250) + + // Start pollers + app.wg.Add(2) + go app.runObservationPoller() + go app.runForecastPoller() + + app.logger.Info("Weather poller started") + + // Wait for shutdown signal + select { + case sig := <-signalChan: + app.logger.Info("Received shutdown signal", "signal", sig) + app.shutdownCancel() + case <-app.shutdownCtx.Done(): + app.logger.Info("Shutdown initiated") + } + + // Wait for graceful shutdown + shutdownDone := make(chan struct{}) + go func() { + app.wg.Wait() + close(shutdownDone) + }() + + select { + case <-shutdownDone: + app.logger.Info("Shutdown completed") + case <-time.After(30 * time.Second): + app.logger.Warn("Shutdown timed out") + } +} -- cgit v1.2.3-70-g09d2