diff options
Diffstat (limited to 'main.go')
| -rw-r--r-- | main.go | 792 |
1 files changed, 0 insertions, 792 deletions
diff --git a/main.go b/main.go deleted file mode 100644 index c512022..0000000 --- a/main.go +++ /dev/null @@ -1,792 +0,0 @@ -package main - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "encoding/xml" - "fmt" - "io" - "log/slog" - "math" - "net/http" - "net/url" - "os" - "strconv" - "strings" - "time" - - "github.com/eclipse/paho.golang/autopaho" - "github.com/eclipse/paho.golang/paho" - "github.com/joho/godotenv" -) - -type Config struct { - // FMI API - FMIEndpoint string `env:"FMI_ENDPOINT"` - - // Observation - StationID int `env:"STATION_ID"` - ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` - ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` - ObsTimestep int `env:"OBS_TIMESTEP"` - WatermarkFile string `env:"WATERMARK_FILE"` - - // Forecast - FcPollEvery time.Duration `env:"FC_POLL_EVERY"` - HelLat float64 `env:"HEL_LAT"` - HelLon float64 `env:"HEL_LON"` - - // MQTT - MQTTBroker string `env:"MQTT_BROKER"` - MQTTClientID string `env:"MQTT_CLIENT_ID"` - MQTTUsername string `env:"MQTT_USERNAME"` - MQTTPassword string `env:"MQTT_PASSWORD"` - MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` - MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` - - // Application - LogLevel string `env:"LOG_LEVEL"` - HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` -} - -type Observation struct { - Station int `json:"station"` - Parameter string `json:"parameter"` - Time time.Time `json:"time"` - Value *float64 `json:"value,omitempty"` -} - -type ForecastValue struct { - Location struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - } `json:"location"` - - Model string `json:"model"` - RunTime time.Time `json:"run_time"` - ForecastTime time.Time `json:"forecast_time"` - Parameter string `json:"parameter"` - Value *float64 `json:"value,omitempty"` -} - -type JSONFloat64 float64 - -func (f JSONFloat64) MarshalJSON() ([]byte, error) { - if math.IsNaN(float64(f)) { - return []byte("null"), nil - } - return json.Marshal(float64(f)) -} - -func loadConfig() *Config { - // Load .env file if it exists - _ = godotenv.Load() - - cfg := &Config{ - FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), - StationID: parseInt(getEnv("STATION_ID", "100968")), - ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")), - ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")), - ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")), - WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"), - FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")), - HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")), - HelLon: parseFloat(getEnv("HEL_LON", "24.9384")), - MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"), - MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-publisher"), - MQTTUsername: getEnv("MQTT_USERNAME", ""), - MQTTPassword: getEnv("MQTT_PASSWORD", ""), - MQTTKeepAlive: parseDuration(getEnv("MQTT_KEEP_ALIVE", "60s")), - MQTTSessionExp: parseDuration(getEnv("MQTT_SESSION_EXP", "1h")), - LogLevel: getEnv("LOG_LEVEL", "debug"), - HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")), - } - - return cfg -} - -func getEnv(key, defaultValue string) string { - if value := os.Getenv(key); value != "" { - return value - } - return defaultValue -} - -func parseDuration(s string) time.Duration { - d, err := time.ParseDuration(s) - if err != nil { - return 5 * time.Minute // Default fallback - } - return d -} - -func parseInt(s string) int { - i, err := strconv.Atoi(s) - if err != nil { - return 0 - } - return i -} - -func parseFloat(s string) float64 { - f, err := strconv.ParseFloat(s, 64) - if err != nil { - return 0 - } - return f -} - -func parseLogLevel(level string) slog.Level { - switch strings.ToLower(level) { - case "debug": - return slog.LevelDebug - case "warn", "warning": - return slog.LevelWarn - case "error": - return slog.LevelError - default: - return slog.LevelInfo - } -} - -func loadWatermark(filename string) time.Time { - b, err := os.ReadFile(filename) - if err != nil { - return time.Now().UTC().Add(-24 * time.Hour) - } - sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) - if err != nil { - return time.Now().UTC().Add(-24 * time.Hour) - } - return time.Unix(sec, 0).UTC() -} - -func storeWatermark(filename string, t time.Time) error { - data := []byte(fmt.Sprintf("%d", t.Unix())) - return os.WriteFile(filename, data, 0644) -} - -func fetchData(url string, timeout time.Duration) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "application/xml") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) - } - - return io.ReadAll(resp.Body) -} - -func buildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { - q := url.Values{} - q.Set("service", "WFS") - q.Set("version", "2.0.0") - q.Set("request", "GetFeature") - q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") - q.Set("fmisid", fmt.Sprintf("%d", stationID)) - q.Set("starttime", start.Format(time.RFC3339)) - q.Set("endtime", end.Format(time.RFC3339)) - q.Set("timestep", fmt.Sprintf("%d", timestep)) - - return endpoint + "?" + q.Encode() -} - -func parseObservations(data []byte, stationID int, minTime time.Time, logger *slog.Logger) ([]Observation, time.Time, error) { - dec := xml.NewDecoder(bytes.NewReader(data)) - - var ( - param string - ts time.Time - maxObserved = minTime - out []Observation - inTVP bool - inTime bool - inValue bool - ) - - for { - tok, err := dec.Token() - if err == io.EOF { - break - } - if err != nil { - return nil, maxObserved, err - } - - switch el := tok.(type) { - case xml.StartElement: - switch el.Name.Local { - case "MeasurementTimeseries": - for _, a := range el.Attr { - if a.Name.Local == "id" { - parts := strings.Split(a.Value, "-") - if len(parts) > 0 { - param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) - } - } - } - case "MeasurementTVP": - inTVP = true - ts = time.Time{} - case "time": - if inTVP { - inTime = true - } - case "value": - if inTVP { - inValue = true - } - } - - case xml.CharData: - if inTime && inTVP { - s := strings.TrimSpace(string(el)) - if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { - ts = parsedTime - } - } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { - s := strings.TrimSpace(string(el)) - var valuePtr *float64 - - if !strings.EqualFold(s, "NaN") && s != "" { - if value, err := strconv.ParseFloat(s, 64); err == nil { - valuePtr = &value - } - } - - out = append(out, Observation{ - Station: stationID, - Parameter: param, - Time: ts, - Value: valuePtr, - }) - - if ts.After(maxObserved) { - maxObserved = ts - } - } - - case xml.EndElement: - switch el.Name.Local { - case "MeasurementTVP": - inTVP = false - case "time": - inTime = false - case "value": - inValue = false - } - } - } - - return out, maxObserved, nil -} - -func buildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { - q := url.Values{} - q.Set("service", "WFS") - q.Set("version", "2.0.0") - q.Set("request", "GetFeature") - q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") - q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) - q.Set("starttime", start.Format(time.RFC3339)) - q.Set("endtime", end.Format(time.RFC3339)) - - return endpoint + "?" + q.Encode() -} - -func parseForecast(data []byte, lat, lon float64, logger *slog.Logger) ([]ForecastValue, time.Time, error) { - dec := xml.NewDecoder(bytes.NewReader(data)) - - var ( - param string - runTime time.Time - fcTime time.Time - out []ForecastValue - inTVP bool - inTime bool - inValue bool - ) - - for { - tok, err := dec.Token() - if err == io.EOF { - break - } - if err != nil { - return nil, runTime, err - } - - switch el := tok.(type) { - case xml.StartElement: - switch el.Name.Local { - case "resultTime": - var s string - if err := dec.DecodeElement(&s, &el); err == nil { - if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { - runTime = parsedTime - } - } - case "MeasurementTimeseries": - for _, a := range el.Attr { - if a.Name.Local == "id" { - parts := strings.Split(a.Value, "-") - if len(parts) > 0 { - param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) - } - } - } - case "MeasurementTVP": - inTVP = true - fcTime = time.Time{} - case "time": - if inTVP { - inTime = true - } - case "value": - if inTVP { - inValue = true - } - } - - case xml.CharData: - if inTime && inTVP { - s := strings.TrimSpace(string(el)) - if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { - fcTime = parsedTime - } - } else if inValue && inTVP && !fcTime.IsZero() { - s := strings.TrimSpace(string(el)) - var valuePtr *float64 - - if !strings.EqualFold(s, "NaN") && s != "" { - if value, err := strconv.ParseFloat(s, 64); err == nil { - valuePtr = &value - } - } - - fc := ForecastValue{ - Location: struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - }{lat, lon}, - Model: "harmonie", - RunTime: runTime, - ForecastTime: fcTime, - Parameter: param, - Value: valuePtr, - } - out = append(out, fc) - } - - case xml.EndElement: - switch el.Name.Local { - case "MeasurementTVP": - inTVP = false - case "time": - inTime = false - case "value": - inValue = false - } - } - } - - return out, runTime, nil -} - -func createMQTTClient(ctx context.Context, cfg *Config, logger *slog.Logger) (*autopaho.ConnectionManager, error) { - serverURL, err := url.Parse(cfg.MQTTBroker) - if err != nil { - return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) - } - - // Client configuration - cliCfg := autopaho.ClientConfig{ - ServerUrls: []*url.URL{serverURL}, - KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), - ClientConfig: paho.ClientConfig{ - ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), - OnPublishReceived: []func(paho.PublishReceived) (bool, error){ - func(pr paho.PublishReceived) (bool, error) { - logger.Debug("Received message", "topic", pr.Packet.Topic) - return true, nil - }, - }, - // Set OnDisconnect here - OnServerDisconnect: func(d *paho.Disconnect) { - if d != nil { - logger.Warn("MQTT disconnected", "reason", d.ReasonCode) - } else { - logger.Info("MQTT disconnected gracefully") - } - }, - }, - ConnectUsername: cfg.MQTTUsername, - ConnectPassword: []byte(cfg.MQTTPassword), - ConnectTimeout: cfg.HTTPTimeout, - } - - // MQTT v5 Connect properties - these need to be pointers - cliCfg.KeepAlive = uint16(cfg.MQTTKeepAlive.Seconds()) - cliCfg.ClientID = fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()) - cliCfg.CleanStartOnInitialConnection = false - - // Last Will and Testament (LWT) - cliCfg.WillMessage = &paho.WillMessage{ - Topic: "weather/clients/status", - Payload: []byte(fmt.Sprintf(`{"client_id": "%s", "status": "offline"}`, cfg.MQTTClientID)), - QoS: 1, - Retain: true, - } - - // TLS configuration (if using TLS) - if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { - cliCfg.TlsCfg = &tls.Config{ - MinVersion: tls.VersionTLS12, - } - } - - // Callbacks - cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { - logger.Info("MQTT v5 connected", - "broker", serverURL.Host, - "session_present", connAck.Properties != nil && connAck.Properties.SessionExpiryInterval != nil, - "keep_alive", cfg.MQTTKeepAlive) - } - - cliCfg.OnConnectError = func(err error) { - logger.Error("MQTT connection error", "error", err) - } - - // Connect to broker - cm, err := autopaho.NewConnection(ctx, cliCfg) - if err != nil { - return nil, fmt.Errorf("failed to create MQTT connection: %w", err) - } - - // Wait for connection to be established - err = cm.AwaitConnection(ctx) - if err != nil { - return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) - } - - return cm, nil -} - -func publishObservation(cm *autopaho.ConnectionManager, obs Observation, logger *slog.Logger) error { - topic := fmt.Sprintf("weather/obs/fmi/%d/%s", obs.Station, obs.Parameter) - - // Safe JSON structure - type SafeObservation struct { - Station int `json:"station"` - Parameter string `json:"parameter"` - Time time.Time `json:"time"` - Value *JSONFloat64 `json:"value,omitempty"` - } - - var safeValue *JSONFloat64 - if obs.Value != nil { - v := JSONFloat64(*obs.Value) - safeValue = &v - } - - safeObs := SafeObservation{ - Station: obs.Station, - Parameter: obs.Parameter, - Time: obs.Time, - Value: safeValue, - } - - b, err := json.Marshal(safeObs) - if err != nil { - return err - } - - // MQTT v5 publish properties - props := &paho.PublishProperties{ - User: []paho.UserProperty{ - {Key: "station_id", Value: fmt.Sprintf("%d", obs.Station)}, - {Key: "parameter", Value: obs.Parameter}, - {Key: "observation_time", Value: obs.Time.Format(time.RFC3339)}, - {Key: "data_type", Value: "observation"}, - {Key: "source", Value: "fmi"}, - }, - } - - pb := &paho.Publish{ - Topic: topic, - QoS: 1, - Retain: false, - Payload: b, - Properties: props, - } - - // Publish with context timeout - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err = cm.Publish(ctx, pb) - if err != nil { - return fmt.Errorf("failed to publish observation: %w", err) - } - - logger.Debug("Published observation", - "topic", topic, - "station", obs.Station, - "parameter", obs.Parameter, - "value", obs.Value, - "time", obs.Time) - - return nil -} - -func publishForecast(cm *autopaho.ConnectionManager, fc ForecastValue, logger *slog.Logger) error { - topic := fmt.Sprintf( - "weather/forecast/fmi/harmonie/helsinki/run=%s/%s", - fc.RunTime.Format(time.RFC3339), - fc.Parameter, - ) - - // Safe JSON structure - type SafeForecastValue struct { - Location struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - } `json:"location"` - Model string `json:"model"` - RunTime time.Time `json:"run_time"` - ForecastTime time.Time `json:"forecast_time"` - Parameter string `json:"parameter"` - Value *JSONFloat64 `json:"value,omitempty"` - } - - var safeValue *JSONFloat64 - if fc.Value != nil { - v := JSONFloat64(*fc.Value) - safeValue = &v - } - - safeFc := SafeForecastValue{ - Location: fc.Location, - Model: fc.Model, - RunTime: fc.RunTime, - ForecastTime: fc.ForecastTime, - Parameter: fc.Parameter, - Value: safeValue, - } - - b, err := json.Marshal(safeFc) - if err != nil { - return err - } - - // MQTT v5 publish properties - props := &paho.PublishProperties{ - User: []paho.UserProperty{ - {Key: "model", Value: fc.Model}, - {Key: "parameter", Value: fc.Parameter}, - {Key: "run_time", Value: fc.RunTime.Format(time.RFC3339)}, - {Key: "forecast_time", Value: fc.ForecastTime.Format(time.RFC3339)}, - {Key: "data_type", Value: "forecast"}, - {Key: "source", Value: "fmi"}, - {Key: "location_lat", Value: fmt.Sprintf("%.4f", fc.Location.Lat)}, - {Key: "location_lon", Value: fmt.Sprintf("%.4f", fc.Location.Lon)}, - }, - } - - pb := &paho.Publish{ - Topic: topic, - QoS: 1, - Retain: true, // Forecasts are retained since they don't change - Payload: b, - Properties: props, - } - - // Publish with context timeout - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err = cm.Publish(ctx, pb) - if err != nil { - return fmt.Errorf("failed to publish forecast: %w", err) - } - - logger.Debug("Published forecast", - "topic", topic, - "parameter", fc.Parameter, - "run_time", fc.RunTime, - "forecast_time", fc.ForecastTime) - - return nil -} - -func runPoller(ctx context.Context, cfg *Config, logger *slog.Logger) error { - // Load initial state - lastObs := loadWatermark(cfg.WatermarkFile) - lastFcRun := time.Time{} - - // Create MQTT client - mqttClient, err := createMQTTClient(ctx, cfg, logger) - if err != nil { - return fmt.Errorf("failed to connect to MQTT: %w", err) - } - defer func() { - // Graceful disconnect - discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := mqttClient.Disconnect(discCtx); err != nil { - logger.Error("Failed to disconnect MQTT", "error", err) - } - }() - - // Create timers - obsTimer := time.NewTimer(0) // Start immediately - fcTimer := time.NewTimer(0) // Start immediately - - defer obsTimer.Stop() - defer fcTimer.Stop() - - logger.Info("Poller started", - "obs_interval", cfg.ObsPollEvery, - "fc_interval", cfg.FcPollEvery, - "mqtt_broker", cfg.MQTTBroker) - - for { - select { - case <-ctx.Done(): - logger.Info("Poller stopping") - return nil - - case <-obsTimer.C: - // Poll observations - obsEnd := time.Now().UTC().Add(-cfg.ObsSafetyLag) - if obsEnd.After(lastObs) { - logger.Info("Polling observations", "from", lastObs, "to", obsEnd) - - url := buildObservationsURL(cfg.FMIEndpoint, cfg.StationID, lastObs, obsEnd, cfg.ObsTimestep) - data, err := fetchData(url, cfg.HTTPTimeout) - if err != nil { - logger.Error("Failed to fetch observations", "error", err) - } else { - obs, maxT, err := parseObservations(data, cfg.StationID, lastObs, logger) - - if err != nil { - logger.Error("Failed to parse observations", "error", err) - } else if len(obs) > 0 { - // Publish observations - successCount := 0 - for _, o := range obs { - if err := publishObservation(mqttClient, o, logger); err != nil { - logger.Error("Failed to publish observation", "error", err, "station", o.Station, "parameter", o.Parameter) - } else { - successCount++ - } - } - - // Update watermark - lastObs = maxT - if err := storeWatermark(cfg.WatermarkFile, lastObs); err != nil { - logger.Error("Failed to store watermark", "error", err) - } - - logger.Info("Published observations", - "successful", successCount, - "total", len(obs), - "new_watermark", lastObs) - } else { - logger.Debug("No new observations found") - } - } - } - obsTimer.Reset(cfg.ObsPollEvery) - - case <-fcTimer.C: - // Poll forecast - now := time.Now().UTC() - start := now - end := now.Add(48 * time.Hour) - - logger.Info("Polling forecast", "from", start, "to", end) - - url := buildForecastURL(cfg.FMIEndpoint, cfg.HelLat, cfg.HelLon, start, end) - data, err := fetchData(url, cfg.HTTPTimeout) - if err != nil { - logger.Error("Failed to fetch forecast", "error", err) - } else { - fc, runTime, err := parseForecast(data, cfg.HelLat, cfg.HelLon, logger) - - if err != nil { - logger.Error("Failed to parse forecast", "error", err) - } else if len(fc) > 0 && (runTime.After(lastFcRun) || lastFcRun.IsZero()) { - // Publish forecast - successCount := 0 - for _, f := range fc { - if err := publishForecast(mqttClient, f, logger); err != nil { - logger.Error("Failed to publish forecast", "error", err, "parameter", f.Parameter) - } else { - successCount++ - } - } - - lastFcRun = runTime - logger.Info("Published forecast", - "successful", successCount, - "total", len(fc), - "run_time", runTime) - } else { - logger.Debug("No new forecast data or same run time", "run_time", runTime, "last_run", lastFcRun) - } - } - fcTimer.Reset(cfg.FcPollEvery) - } - } -} - -func main() { - // Load configuration - cfg := loadConfig() - - // Setup logging - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: parseLogLevel(cfg.LogLevel), - })) - - // Create context for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Setup signal handling for graceful shutdown - sigChan := make(chan os.Signal, 1) - // signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - // For now, we don't handle signals in this simplified version - // In production, you'd want to handle SIGINT and SIGTERM - <-sigChan - logger.Info("Received shutdown signal") - cancel() - }() - - // Run the poller - logger.Info("Starting weather data poller with MQTT v5") - if err := runPoller(ctx, cfg, logger); err != nil { - logger.Error("Poller failed", "error", err) - os.Exit(1) - } - - logger.Info("Poller stopped gracefully") -} |
