diff options
| author | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-04 11:52:47 +0200 |
|---|---|---|
| committer | Petri Hienonen <petri.hienonen@gmail.com> | 2026-01-04 11:52:47 +0200 |
| commit | 59491201976316a30ffc475dd99b0af02b5e997d (patch) | |
| tree | ecf395594d5d289d855eba16f786e0fb66c1d814 /internal | |
| parent | 4e0ca0509c6b314eea8a7b2df6d093f5d9b7e70f (diff) | |
| download | pub-59491201976316a30ffc475dd99b0af02b5e997d.tar.zst | |
Both publisher and subscriber
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/weather/config.go | 102 | ||||
| -rw-r--r-- | internal/weather/fmi.go | 258 | ||||
| -rw-r--r-- | internal/weather/mqtt.go | 76 | ||||
| -rw-r--r-- | internal/weather/types.go | 44 | ||||
| -rw-r--r-- | internal/weather/utils.go | 82 |
5 files changed, 562 insertions, 0 deletions
diff --git a/internal/weather/config.go b/internal/weather/config.go new file mode 100644 index 0000000..6450da4 --- /dev/null +++ b/internal/weather/config.go @@ -0,0 +1,102 @@ +package weather + +import ( + "time" +) + +// PublisherConfig configuration for the publisher +type PublisherConfig struct { + // FMI API + FMIEndpoint string `env:"FMI_ENDPOINT"` + + // Observation + StationID int `env:"STATION_ID"` + ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"` + ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"` + ObsTimestep int `env:"OBS_TIMESTEP"` + WatermarkFile string `env:"WATERMARK_FILE"` + + // Forecast + FcPollEvery time.Duration `env:"FC_POLL_EVERY"` + HelLat float64 `env:"HEL_LAT"` + HelLon float64 `env:"HEL_LON"` + + // MQTT + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"` +} + +// SubscriberConfig configuration for the subscriber +type SubscriberConfig struct { + // MQTT Configuration + MQTTBroker string `env:"MQTT_BROKER"` + MQTTClientID string `env:"MQTT_CLIENT_ID"` + MQTTUsername string `env:"MQTT_USERNAME"` + MQTTPassword string `env:"MQTT_PASSWORD"` + MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"` + MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"` + + // Subscription Configuration + Topics []string `env:"SUBSCRIBE_TOPICS"` + QoS int `env:"SUBSCRIBE_QOS"` + + // Application + LogLevel string `env:"LOG_LEVEL"` + EnableDebug bool `env:"ENABLE_DEBUG"` +} + +// LoadPublisherConfig loads publisher configuration from environment +func LoadPublisherConfig() *PublisherConfig { + cfg := &PublisherConfig{ + FMIEndpoint: GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"), + StationID: ParseInt(GetEnv("STATION_ID", "100968")), + ObsPollEvery: ParseDuration(GetEnv("OBS_POLL_EVERY", "5m")), + ObsSafetyLag: ParseDuration(GetEnv("OBS_SAFETY_LAG", "2m")), + ObsTimestep: ParseInt(GetEnv("OBS_TIMESTEP", "60")), + WatermarkFile: GetEnv("WATERMARK_FILE", "obs_watermark.txt"), + FcPollEvery: ParseDuration(GetEnv("FC_POLL_EVERY", "1h")), + HelLat: ParseFloat(GetEnv("HEL_LAT", "60.1699")), + HelLon: ParseFloat(GetEnv("HEL_LON", "24.9384")), + MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-publisher"), + MQTTUsername: GetEnv("MQTT_USERNAME", ""), + MQTTPassword: GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")), + LogLevel: GetEnv("LOG_LEVEL", "info"), + HTTPTimeout: ParseDuration(GetEnv("HTTP_TIMEOUT", "30s")), + } + + return cfg +} + +// LoadSubscriberConfig loads subscriber configuration from environment +func LoadSubscriberConfig() *SubscriberConfig { + qos := ParseInt(GetEnv("SUBSCRIBE_QOS", "1")) + if qos < 0 || qos > 2 { + qos = 1 + } + + cfg := &SubscriberConfig{ + MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"), + MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-subscriber"), + MQTTUsername: GetEnv("MQTT_USERNAME", ""), + MQTTPassword: GetEnv("MQTT_PASSWORD", ""), + MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")), + MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")), + Topics: ParseTopics(GetEnv("SUBSCRIBE_TOPICS", "")), + QoS: qos, + LogLevel: GetEnv("LOG_LEVEL", "info"), + EnableDebug: GetEnv("ENABLE_DEBUG", "false") == "true", + } + + return cfg +} diff --git a/internal/weather/fmi.go b/internal/weather/fmi.go new file mode 100644 index 0000000..52c9f63 --- /dev/null +++ b/internal/weather/fmi.go @@ -0,0 +1,258 @@ +package weather + +import ( + "bytes" + "context" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +// BuildObservationsURL constructs the URL for fetching observations from FMI. +func BuildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") + q.Set("fmisid", fmt.Sprintf("%d", stationID)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + q.Set("timestep", fmt.Sprintf("%d", timestep)) + + return endpoint + "?" + q.Encode() +} + +// BuildForecastURL constructs the URL for fetching forecasts from FMI. +func BuildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + + return endpoint + "?" + q.Encode() +} + +// FetchData fetches data from the given URL with a timeout. +func FetchData(url string, timeout time.Duration) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/xml") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + return io.ReadAll(resp.Body) +} + +// ParseObservations parses the XML response from FMI observations query. +func ParseObservations(data []byte, stationID int, minTime time.Time) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + ts time.Time + maxObserved = minTime + out []Observation + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, maxObserved, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + ts = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + ts = parsedTime + } + } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + out = append(out, Observation{ + Station: stationID, + Parameter: param, + Time: ts, + Value: valuePtr, + }) + + if ts.After(maxObserved) { + maxObserved = ts + } + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, maxObserved, nil +} + +// ParseForecast parses the XML response from FMI forecast query. +func ParseForecast(data []byte, lat, lon float64) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, runTime, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "resultTime": + var s string + if err := dec.DecodeElement(&s, &el); err == nil { + if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { + runTime = parsedTime + } + } + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + fcTime = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + fcTime = parsedTime + } + } else if inValue && inTVP && !fcTime.IsZero() { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + fc := ForecastValue{ + Location: struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + }{lat, lon}, + Model: "harmonie", + RunTime: runTime, + ForecastTime: fcTime, + Parameter: param, + Value: valuePtr, + } + out = append(out, fc) + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, runTime, nil +} diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go new file mode 100644 index 0000000..211b441 --- /dev/null +++ b/internal/weather/mqtt.go @@ -0,0 +1,76 @@ +package weather + +import ( + "context" + "crypto/tls" + "fmt" + "net/url" + "os" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" +) + +// MQTTConfig holds MQTT connection configuration +type MQTTConfig struct { + Broker string + ClientID string + Username string + Password string + KeepAlive time.Duration + SessionExpiry time.Duration + Timeout time.Duration + OnPublishReceived []func(paho.PublishReceived) (bool, error) +} + +// CreateMQTTClient creates a MQTT client with the given configuration +func CreateMQTTClient(ctx context.Context, cfg MQTTConfig, + onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) { + + serverURL, err := url.Parse(cfg.Broker) + if err != nil { + return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.KeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()), + OnPublishReceived: cfg.OnPublishReceived, + OnServerDisconnect: func(d *paho.Disconnect) { + + // Default disconnect handler + }, + }, + ConnectUsername: cfg.Username, + ConnectPassword: []byte(cfg.Password), + ConnectTimeout: cfg.Timeout, + } + + cliCfg.CleanStartOnInitialConnection = false + cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds()) + + if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { + cliCfg.TlsCfg = &tls.Config{ + MinVersion: tls.VersionTLS12, + } + } + + if onConnect != nil { + cliCfg.OnConnectionUp = onConnect + } + + cm, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return nil, fmt.Errorf("failed to create MQTT connection: %w", err) + } + + err = cm.AwaitConnection(ctx) + if err != nil { + return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) + } + + return cm, nil +} diff --git a/internal/weather/types.go b/internal/weather/types.go new file mode 100644 index 0000000..5563bfa --- /dev/null +++ b/internal/weather/types.go @@ -0,0 +1,44 @@ +package weather + +import ( + "encoding/json" + "math" + "time" +) + +// Observation represents a weather observation from a station +type Observation struct { + Station int `json:"station"` + Parameter string `json:"parameter"` + Time time.Time `json:"time"` + Value *float64 `json:"value,omitempty"` +} + +// ForecastValue represents a weather forecast value +type ForecastValue struct { + Location struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + } `json:"location"` + Model string `json:"model"` + RunTime time.Time `json:"run_time"` + ForecastTime time.Time `json:"forecast_time"` + Parameter string `json:"parameter"` + Value *float64 `json:"value,omitempty"` +} + +// JSONFloat64 handles NaN values properly in JSON +type JSONFloat64 float64 + +func (f JSONFloat64) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + return []byte("null"), nil + } + return json.Marshal(float64(f)) +} + +// TopicStats tracks message statistics for subscribers +type TopicStats struct { + MessagesReceived map[string]int + LastMessageTime map[string]time.Time +} diff --git a/internal/weather/utils.go b/internal/weather/utils.go new file mode 100644 index 0000000..b59949a --- /dev/null +++ b/internal/weather/utils.go @@ -0,0 +1,82 @@ +// internal/weather/utils.go +package weather + +import ( + "os" + "strconv" + "strings" + "time" +) + +// GetEnv gets environment variable with default +func GetEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +// ParseDuration parses duration string +func ParseDuration(s string) time.Duration { + d, err := time.ParseDuration(s) + if err != nil { + return 5 * time.Minute + } + return d +} + +// ParseInt parses integer string +func ParseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + return 0 + } + return i +} + +// ParseFloat parses float string +func ParseFloat(s string) float64 { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0 + } + return f +} + +// ParseTopics parses comma-separated topics +func ParseTopics(s string) []string { + if s == "" { + return []string{ + "weather/obs/fmi/#", + "weather/forecast/fmi/#", + } + } + return strings.Split(s, ",") +} + +// LoadWatermark loads last observation time from file +func LoadWatermark(filename string) time.Time { + b, err := os.ReadFile(filename) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64) + if err != nil { + return time.Now().UTC().Add(-24 * time.Hour) + } + return time.Unix(sec, 0).UTC() +} + +// StoreWatermark stores observation time to file +func StoreWatermark(filename string, t time.Time) error { + data := []byte(strconv.FormatInt(t.Unix(), 10)) + return os.WriteFile(filename, data, 0644) +} + +// InitTopicStats initializes topic statistics +func InitTopicStats() *TopicStats { + return &TopicStats{ + MessagesReceived: make(map[string]int), + LastMessageTime: make(map[string]time.Time), + } +} |
