aboutsummaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
authorPetri Hienonen <petri.hienonen@gmail.com>2026-01-04 11:52:47 +0200
committerPetri Hienonen <petri.hienonen@gmail.com>2026-01-04 11:52:47 +0200
commit59491201976316a30ffc475dd99b0af02b5e997d (patch)
treeecf395594d5d289d855eba16f786e0fb66c1d814 /internal
parent4e0ca0509c6b314eea8a7b2df6d093f5d9b7e70f (diff)
downloadpub-59491201976316a30ffc475dd99b0af02b5e997d.tar.zst
Both publisher and subscriber
Diffstat (limited to 'internal')
-rw-r--r--internal/weather/config.go102
-rw-r--r--internal/weather/fmi.go258
-rw-r--r--internal/weather/mqtt.go76
-rw-r--r--internal/weather/types.go44
-rw-r--r--internal/weather/utils.go82
5 files changed, 562 insertions, 0 deletions
diff --git a/internal/weather/config.go b/internal/weather/config.go
new file mode 100644
index 0000000..6450da4
--- /dev/null
+++ b/internal/weather/config.go
@@ -0,0 +1,102 @@
+package weather
+
+import (
+ "time"
+)
+
+// PublisherConfig configuration for the publisher
+type PublisherConfig struct {
+ // FMI API
+ FMIEndpoint string `env:"FMI_ENDPOINT"`
+
+ // Observation
+ StationID int `env:"STATION_ID"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG"`
+ ObsTimestep int `env:"OBS_TIMESTEP"`
+ WatermarkFile string `env:"WATERMARK_FILE"`
+
+ // Forecast
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY"`
+ HelLat float64 `env:"HEL_LAT"`
+ HelLon float64 `env:"HEL_LON"`
+
+ // MQTT
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT"`
+}
+
+// SubscriberConfig configuration for the subscriber
+type SubscriberConfig struct {
+ // MQTT Configuration
+ MQTTBroker string `env:"MQTT_BROKER"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+ MQTTKeepAlive time.Duration `env:"MQTT_KEEP_ALIVE"`
+ MQTTSessionExp time.Duration `env:"MQTT_SESSION_EXP"`
+
+ // Subscription Configuration
+ Topics []string `env:"SUBSCRIBE_TOPICS"`
+ QoS int `env:"SUBSCRIBE_QOS"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL"`
+ EnableDebug bool `env:"ENABLE_DEBUG"`
+}
+
+// LoadPublisherConfig loads publisher configuration from environment
+func LoadPublisherConfig() *PublisherConfig {
+ cfg := &PublisherConfig{
+ FMIEndpoint: GetEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: ParseInt(GetEnv("STATION_ID", "100968")),
+ ObsPollEvery: ParseDuration(GetEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: ParseDuration(GetEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: ParseInt(GetEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: GetEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: ParseDuration(GetEnv("FC_POLL_EVERY", "1h")),
+ HelLat: ParseFloat(GetEnv("HEL_LAT", "60.1699")),
+ HelLon: ParseFloat(GetEnv("HEL_LON", "24.9384")),
+ MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-publisher"),
+ MQTTUsername: GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")),
+ LogLevel: GetEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: ParseDuration(GetEnv("HTTP_TIMEOUT", "30s")),
+ }
+
+ return cfg
+}
+
+// LoadSubscriberConfig loads subscriber configuration from environment
+func LoadSubscriberConfig() *SubscriberConfig {
+ qos := ParseInt(GetEnv("SUBSCRIBE_QOS", "1"))
+ if qos < 0 || qos > 2 {
+ qos = 1
+ }
+
+ cfg := &SubscriberConfig{
+ MQTTBroker: GetEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: GetEnv("MQTT_CLIENT_ID", "fmi-subscriber"),
+ MQTTUsername: GetEnv("MQTT_USERNAME", ""),
+ MQTTPassword: GetEnv("MQTT_PASSWORD", ""),
+ MQTTKeepAlive: ParseDuration(GetEnv("MQTT_KEEP_ALIVE", "60s")),
+ MQTTSessionExp: ParseDuration(GetEnv("MQTT_SESSION_EXP", "1h")),
+ Topics: ParseTopics(GetEnv("SUBSCRIBE_TOPICS", "")),
+ QoS: qos,
+ LogLevel: GetEnv("LOG_LEVEL", "info"),
+ EnableDebug: GetEnv("ENABLE_DEBUG", "false") == "true",
+ }
+
+ return cfg
+}
diff --git a/internal/weather/fmi.go b/internal/weather/fmi.go
new file mode 100644
index 0000000..52c9f63
--- /dev/null
+++ b/internal/weather/fmi.go
@@ -0,0 +1,258 @@
+package weather
+
+import (
+ "bytes"
+ "context"
+ "encoding/xml"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// BuildObservationsURL constructs the URL for fetching observations from FMI.
+func BuildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
+ q.Set("fmisid", fmt.Sprintf("%d", stationID))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+ q.Set("timestep", fmt.Sprintf("%d", timestep))
+
+ return endpoint + "?" + q.Encode()
+}
+
+// BuildForecastURL constructs the URL for fetching forecasts from FMI.
+func BuildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
+ q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+
+ return endpoint + "?" + q.Encode()
+}
+
+// FetchData fetches data from the given URL with a timeout.
+func FetchData(url string, timeout time.Duration) ([]byte, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Accept", "application/xml")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
+ }
+
+ return io.ReadAll(resp.Body)
+}
+
+// ParseObservations parses the XML response from FMI observations query.
+func ParseObservations(data []byte, stationID int, minTime time.Time) ([]Observation, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
+
+ var (
+ param string
+ ts time.Time
+ maxObserved = minTime
+ out []Observation
+ inTVP bool
+ inTime bool
+ inValue bool
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, maxObserved, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
+ }
+ }
+ case "MeasurementTVP":
+ inTVP = true
+ ts = time.Time{}
+ case "time":
+ if inTVP {
+ inTime = true
+ }
+ case "value":
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ ts = parsedTime
+ }
+ } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
+ }
+ }
+
+ out = append(out, Observation{
+ Station: stationID,
+ Parameter: param,
+ Time: ts,
+ Value: valuePtr,
+ })
+
+ if ts.After(maxObserved) {
+ maxObserved = ts
+ }
+ }
+
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
+ }
+
+ return out, maxObserved, nil
+}
+
+// ParseForecast parses the XML response from FMI forecast query.
+func ParseForecast(data []byte, lat, lon float64) ([]ForecastValue, time.Time, error) {
+ dec := xml.NewDecoder(bytes.NewReader(data))
+
+ var (
+ param string
+ runTime time.Time
+ fcTime time.Time
+ out []ForecastValue
+ inTVP bool
+ inTime bool
+ inValue bool
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, runTime, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "resultTime":
+ var s string
+ if err := dec.DecodeElement(&s, &el); err == nil {
+ if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil {
+ runTime = parsedTime
+ }
+ }
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ parts := strings.Split(a.Value, "-")
+ if len(parts) > 0 {
+ param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_"))
+ }
+ }
+ }
+ case "MeasurementTVP":
+ inTVP = true
+ fcTime = time.Time{}
+ case "time":
+ if inTVP {
+ inTime = true
+ }
+ case "value":
+ if inTVP {
+ inValue = true
+ }
+ }
+
+ case xml.CharData:
+ if inTime && inTVP {
+ s := strings.TrimSpace(string(el))
+ if parsedTime, err := time.Parse(time.RFC3339, s); err == nil {
+ fcTime = parsedTime
+ }
+ } else if inValue && inTVP && !fcTime.IsZero() {
+ s := strings.TrimSpace(string(el))
+ var valuePtr *float64
+
+ if !strings.EqualFold(s, "NaN") && s != "" {
+ if value, err := strconv.ParseFloat(s, 64); err == nil {
+ valuePtr = &value
+ }
+ }
+
+ fc := ForecastValue{
+ Location: struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ }{lat, lon},
+ Model: "harmonie",
+ RunTime: runTime,
+ ForecastTime: fcTime,
+ Parameter: param,
+ Value: valuePtr,
+ }
+ out = append(out, fc)
+ }
+
+ case xml.EndElement:
+ switch el.Name.Local {
+ case "MeasurementTVP":
+ inTVP = false
+ case "time":
+ inTime = false
+ case "value":
+ inValue = false
+ }
+ }
+ }
+
+ return out, runTime, nil
+}
diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go
new file mode 100644
index 0000000..211b441
--- /dev/null
+++ b/internal/weather/mqtt.go
@@ -0,0 +1,76 @@
+package weather
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "net/url"
+ "os"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+)
+
+// MQTTConfig holds MQTT connection configuration
+type MQTTConfig struct {
+ Broker string
+ ClientID string
+ Username string
+ Password string
+ KeepAlive time.Duration
+ SessionExpiry time.Duration
+ Timeout time.Duration
+ OnPublishReceived []func(paho.PublishReceived) (bool, error)
+}
+
+// CreateMQTTClient creates a MQTT client with the given configuration
+func CreateMQTTClient(ctx context.Context, cfg MQTTConfig,
+ onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) {
+
+ serverURL, err := url.Parse(cfg.Broker)
+ if err != nil {
+ return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.KeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()),
+ OnPublishReceived: cfg.OnPublishReceived,
+ OnServerDisconnect: func(d *paho.Disconnect) {
+
+ // Default disconnect handler
+ },
+ },
+ ConnectUsername: cfg.Username,
+ ConnectPassword: []byte(cfg.Password),
+ ConnectTimeout: cfg.Timeout,
+ }
+
+ cliCfg.CleanStartOnInitialConnection = false
+ cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds())
+
+ if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
+ cliCfg.TlsCfg = &tls.Config{
+ MinVersion: tls.VersionTLS12,
+ }
+ }
+
+ if onConnect != nil {
+ cliCfg.OnConnectionUp = onConnect
+ }
+
+ cm, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ err = cm.AwaitConnection(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
+ }
+
+ return cm, nil
+}
diff --git a/internal/weather/types.go b/internal/weather/types.go
new file mode 100644
index 0000000..5563bfa
--- /dev/null
+++ b/internal/weather/types.go
@@ -0,0 +1,44 @@
+package weather
+
+import (
+ "encoding/json"
+ "math"
+ "time"
+)
+
+// Observation represents a weather observation from a station
+type Observation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value *float64 `json:"value,omitempty"`
+}
+
+// ForecastValue represents a weather forecast value
+type ForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value *float64 `json:"value,omitempty"`
+}
+
+// JSONFloat64 handles NaN values properly in JSON
+type JSONFloat64 float64
+
+func (f JSONFloat64) MarshalJSON() ([]byte, error) {
+ if math.IsNaN(float64(f)) {
+ return []byte("null"), nil
+ }
+ return json.Marshal(float64(f))
+}
+
+// TopicStats tracks message statistics for subscribers
+type TopicStats struct {
+ MessagesReceived map[string]int
+ LastMessageTime map[string]time.Time
+}
diff --git a/internal/weather/utils.go b/internal/weather/utils.go
new file mode 100644
index 0000000..b59949a
--- /dev/null
+++ b/internal/weather/utils.go
@@ -0,0 +1,82 @@
+// internal/weather/utils.go
+package weather
+
+import (
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// GetEnv gets environment variable with default
+func GetEnv(key, defaultValue string) string {
+ if value := os.Getenv(key); value != "" {
+ return value
+ }
+ return defaultValue
+}
+
+// ParseDuration parses duration string
+func ParseDuration(s string) time.Duration {
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return 5 * time.Minute
+ }
+ return d
+}
+
+// ParseInt parses integer string
+func ParseInt(s string) int {
+ i, err := strconv.Atoi(s)
+ if err != nil {
+ return 0
+ }
+ return i
+}
+
+// ParseFloat parses float string
+func ParseFloat(s string) float64 {
+ f, err := strconv.ParseFloat(s, 64)
+ if err != nil {
+ return 0
+ }
+ return f
+}
+
+// ParseTopics parses comma-separated topics
+func ParseTopics(s string) []string {
+ if s == "" {
+ return []string{
+ "weather/obs/fmi/#",
+ "weather/forecast/fmi/#",
+ }
+ }
+ return strings.Split(s, ",")
+}
+
+// LoadWatermark loads last observation time from file
+func LoadWatermark(filename string) time.Time {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ return time.Unix(sec, 0).UTC()
+}
+
+// StoreWatermark stores observation time to file
+func StoreWatermark(filename string, t time.Time) error {
+ data := []byte(strconv.FormatInt(t.Unix(), 10))
+ return os.WriteFile(filename, data, 0644)
+}
+
+// InitTopicStats initializes topic statistics
+func InitTopicStats() *TopicStats {
+ return &TopicStats{
+ MessagesReceived: make(map[string]int),
+ LastMessageTime: make(map[string]time.Time),
+ }
+}