From 59491201976316a30ffc475dd99b0af02b5e997d Mon Sep 17 00:00:00 2001 From: Petri Hienonen Date: Sun, 4 Jan 2026 11:52:47 +0200 Subject: Both publisher and subscriber --- internal/weather/fmi.go | 258 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 internal/weather/fmi.go (limited to 'internal/weather/fmi.go') diff --git a/internal/weather/fmi.go b/internal/weather/fmi.go new file mode 100644 index 0000000..52c9f63 --- /dev/null +++ b/internal/weather/fmi.go @@ -0,0 +1,258 @@ +package weather + +import ( + "bytes" + "context" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +// BuildObservationsURL constructs the URL for fetching observations from FMI. +func BuildObservationsURL(endpoint string, stationID int, start, end time.Time, timestep int) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::observations::weather::timevaluepair") + q.Set("fmisid", fmt.Sprintf("%d", stationID)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + q.Set("timestep", fmt.Sprintf("%d", timestep)) + + return endpoint + "?" + q.Encode() +} + +// BuildForecastURL constructs the URL for fetching forecasts from FMI. +func BuildForecastURL(endpoint string, lat, lon float64, start, end time.Time) string { + q := url.Values{} + q.Set("service", "WFS") + q.Set("version", "2.0.0") + q.Set("request", "GetFeature") + q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple") + q.Set("latlon", fmt.Sprintf("%.4f,%.4f", lat, lon)) + q.Set("starttime", start.Format(time.RFC3339)) + q.Set("endtime", end.Format(time.RFC3339)) + + return endpoint + "?" + q.Encode() +} + +// FetchData fetches data from the given URL with a timeout. +func FetchData(url string, timeout time.Duration) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/xml") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + return io.ReadAll(resp.Body) +} + +// ParseObservations parses the XML response from FMI observations query. +func ParseObservations(data []byte, stationID int, minTime time.Time) ([]Observation, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + ts time.Time + maxObserved = minTime + out []Observation + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, maxObserved, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + ts = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + ts = parsedTime + } + } else if inValue && inTVP && !ts.IsZero() && ts.After(minTime) { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + out = append(out, Observation{ + Station: stationID, + Parameter: param, + Time: ts, + Value: valuePtr, + }) + + if ts.After(maxObserved) { + maxObserved = ts + } + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, maxObserved, nil +} + +// ParseForecast parses the XML response from FMI forecast query. +func ParseForecast(data []byte, lat, lon float64) ([]ForecastValue, time.Time, error) { + dec := xml.NewDecoder(bytes.NewReader(data)) + + var ( + param string + runTime time.Time + fcTime time.Time + out []ForecastValue + inTVP bool + inTime bool + inValue bool + ) + + for { + tok, err := dec.Token() + if err == io.EOF { + break + } + if err != nil { + return nil, runTime, err + } + + switch el := tok.(type) { + case xml.StartElement: + switch el.Name.Local { + case "resultTime": + var s string + if err := dec.DecodeElement(&s, &el); err == nil { + if parsedTime, err := time.Parse(time.RFC3339, strings.TrimSpace(s)); err == nil { + runTime = parsedTime + } + } + case "MeasurementTimeseries": + for _, a := range el.Attr { + if a.Name.Local == "id" { + parts := strings.Split(a.Value, "-") + if len(parts) > 0 { + param = strings.ToLower(strings.TrimPrefix(parts[len(parts)-1], "ts_")) + } + } + } + case "MeasurementTVP": + inTVP = true + fcTime = time.Time{} + case "time": + if inTVP { + inTime = true + } + case "value": + if inTVP { + inValue = true + } + } + + case xml.CharData: + if inTime && inTVP { + s := strings.TrimSpace(string(el)) + if parsedTime, err := time.Parse(time.RFC3339, s); err == nil { + fcTime = parsedTime + } + } else if inValue && inTVP && !fcTime.IsZero() { + s := strings.TrimSpace(string(el)) + var valuePtr *float64 + + if !strings.EqualFold(s, "NaN") && s != "" { + if value, err := strconv.ParseFloat(s, 64); err == nil { + valuePtr = &value + } + } + + fc := ForecastValue{ + Location: struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + }{lat, lon}, + Model: "harmonie", + RunTime: runTime, + ForecastTime: fcTime, + Parameter: param, + Value: valuePtr, + } + out = append(out, fc) + } + + case xml.EndElement: + switch el.Name.Local { + case "MeasurementTVP": + inTVP = false + case "time": + inTime = false + case "value": + inValue = false + } + } + } + + return out, runTime, nil +} -- cgit v1.2.3-70-g09d2