aboutsummaryrefslogtreecommitdiffstats
path: root/main.go
diff options
context:
space:
mode:
authorPetri Hienonen <petri.hienonen@gmail.com>2026-01-02 11:39:29 +0200
committerPetri Hienonen <petri.hienonen@gmail.com>2026-01-02 11:39:29 +0200
commit11392621501d7b5206bc521d6d2a50e2c4e36c45 (patch)
tree7b2d51c89d58f7374fd464ba05d8b60b6df6a91a /main.go
downloadpub-11392621501d7b5206bc521d6d2a50e2c4e36c45.tar.zst
Initial commit
Diffstat (limited to 'main.go')
-rw-r--r--main.go769
1 files changed, 769 insertions, 0 deletions
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..f9afd40
--- /dev/null
+++ b/main.go
@@ -0,0 +1,769 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "net/url"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+)
+
+/* ============================================================
+ Configuration (now configurable)
+ ============================================================ */
+
+type Config struct {
+ // FMI API
+ FMIEndpoint string `env:"FMI_ENDPOINT" default:"https://opendata.fmi.fi/wfs"`
+
+ // Observation
+ StationID int `env:"STATION_ID" default:"100968"`
+ ObsPollEvery time.Duration `env:"OBS_POLL_EVERY" default:"5m"`
+ ObsSafetyLag time.Duration `env:"OBS_SAFETY_LAG" default:"2m"`
+ ObsTimestep int `env:"OBS_TIMESTEP" default:"60"`
+ WatermarkFile string `env:"WATERMARK_FILE" default:"obs_watermark.txt"`
+
+ // Forecast
+ FcPollEvery time.Duration `env:"FC_POLL_EVERY" default:"1h"`
+ HelLat float64 `env:"HEL_LAT" default:"60.1699"`
+ HelLon float64 `env:"HEL_LON" default:"24.9384"`
+
+ // MQTT
+ MQTTBroker string `env:"MQTT_BROKER" default:"tcp://localhost:1883"`
+ MQTTClientID string `env:"MQTT_CLIENT_ID" default:"fmi-poller"`
+ MQTTUsername string `env:"MQTT_USERNAME"`
+ MQTTPassword string `env:"MQTT_PASSWORD"`
+
+ // Application
+ LogLevel string `env:"LOG_LEVEL" default:"info"`
+ HTTPTimeout time.Duration `env:"HTTP_TIMEOUT" default:"30s"`
+ MaxRetries int `env:"MAX_RETRIES" default:"3"`
+ RetryDelay time.Duration `env:"RETRY_DELAY" default:"5s"`
+}
+
+/* ============================================================
+ Application State
+ ============================================================ */
+
+type App struct {
+ config *Config
+ logger *slog.Logger
+ mqttClient mqtt.Client
+ httpClient *http.Client
+
+ // State
+ lastObsWatermark atomic.Value // stores time.Time
+ lastForecastRun atomic.Value // stores time.Time
+ shutdownCtx context.Context
+ shutdownCancel context.CancelFunc
+
+ // Wait group for graceful shutdown
+ wg sync.WaitGroup
+}
+
+/* ============================================================
+ Domain models
+ ============================================================ */
+
+type Observation struct {
+ Station int `json:"station"`
+ Parameter string `json:"parameter"`
+ Time time.Time `json:"time"`
+ Value float64 `json:"value"`
+}
+
+type ForecastValue struct {
+ Location struct {
+ Lat float64 `json:"lat"`
+ Lon float64 `json:"lon"`
+ } `json:"location"`
+
+ Model string `json:"model"`
+ RunTime time.Time `json:"run_time"`
+ ForecastTime time.Time `json:"forecast_time"`
+ Parameter string `json:"parameter"`
+ Value float64 `json:"value"`
+}
+
+/* ============================================================
+ Initialization
+ ============================================================ */
+
+func NewApp() (*App, error) {
+ config := loadConfig()
+
+ // Setup logging
+ logLevel := parseLogLevel(config.LogLevel)
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: logLevel,
+ }))
+
+ // Create HTTP client with timeout
+ httpClient := &http.Client{
+ Timeout: config.HTTPTimeout,
+ Transport: &http.Transport{
+ MaxIdleConns: 10,
+ IdleConnTimeout: 30 * time.Second,
+ DisableCompression: false,
+ },
+ }
+
+ // Create MQTT client
+ mqttOpts := mqtt.NewClientOptions().
+ AddBroker(config.MQTTBroker).
+ SetClientID(fmt.Sprintf("%s-%d", config.MQTTClientID, os.Getpid())).
+ SetCleanSession(true).
+ SetAutoReconnect(true).
+ SetMaxReconnectInterval(10 * time.Second).
+ SetConnectionLostHandler(func(c mqtt.Client, err error) {
+ logger.Error("MQTT connection lost", "error", err)
+ }).
+ SetOnConnectHandler(func(c mqtt.Client) {
+ logger.Info("MQTT connected successfully")
+ })
+
+ if config.MQTTUsername != "" {
+ mqttOpts.SetUsername(config.MQTTUsername)
+ if config.MQTTPassword != "" {
+ mqttOpts.SetPassword(config.MQTTPassword)
+ }
+ }
+
+ mqttClient := mqtt.NewClient(mqttOpts)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ app := &App{
+ config: config,
+ logger: logger,
+ httpClient: httpClient,
+ mqttClient: mqttClient,
+ shutdownCtx: ctx,
+ shutdownCancel: cancel,
+ }
+
+ // Initialize atomic values
+ app.lastObsWatermark.Store(loadWatermark(config.WatermarkFile))
+ app.lastForecastRun.Store(time.Time{})
+
+ return app, nil
+}
+
+func (app *App) ConnectMQTT() error {
+ token := app.mqttClient.Connect()
+ if !token.WaitTimeout(app.config.HTTPTimeout) {
+ return errors.New("MQTT connection timeout")
+ }
+ if err := token.Error(); err != nil {
+ return fmt.Errorf("MQTT connection failed: %w", err)
+ }
+ app.logger.Info("Connected to MQTT broker")
+ return nil
+}
+
+/* ============================================================
+ Configuration loading
+ ============================================================ */
+
+func loadConfig() *Config {
+ cfg := &Config{
+ FMIEndpoint: getEnv("FMI_ENDPOINT", "https://opendata.fmi.fi/wfs"),
+ StationID: parseInt(getEnv("STATION_ID", "100968")),
+ ObsPollEvery: parseDuration(getEnv("OBS_POLL_EVERY", "5m")),
+ ObsSafetyLag: parseDuration(getEnv("OBS_SAFETY_LAG", "2m")),
+ ObsTimestep: parseInt(getEnv("OBS_TIMESTEP", "60")),
+ WatermarkFile: getEnv("WATERMARK_FILE", "obs_watermark.txt"),
+ FcPollEvery: parseDuration(getEnv("FC_POLL_EVERY", "1h")),
+ HelLat: parseFloat(getEnv("HEL_LAT", "60.1699")),
+ HelLon: parseFloat(getEnv("HEL_LON", "24.9384")),
+ MQTTBroker: getEnv("MQTT_BROKER", "tcp://localhost:1883"),
+ MQTTClientID: getEnv("MQTT_CLIENT_ID", "fmi-poller"),
+ MQTTUsername: getEnv("MQTT_USERNAME", ""),
+ MQTTPassword: getEnv("MQTT_PASSWORD", ""),
+ LogLevel: getEnv("LOG_LEVEL", "info"),
+ HTTPTimeout: parseDuration(getEnv("HTTP_TIMEOUT", "30s")),
+ MaxRetries: parseInt(getEnv("MAX_RETRIES", "3")),
+ RetryDelay: parseDuration(getEnv("RETRY_DELAY", "5s")),
+ }
+ return cfg
+}
+
+func getEnv(key, defaultValue string) string {
+ if value, exists := os.LookupEnv(key); exists {
+ return value
+ }
+ return defaultValue
+}
+
+func parseDuration(s string) time.Duration {
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ panic(fmt.Sprintf("invalid duration %s: %v", s, err))
+ }
+ return d
+}
+
+func parseInt(s string) int {
+ i, err := strconv.Atoi(s)
+ if err != nil {
+ panic(fmt.Sprintf("invalid int %s: %v", s, err))
+ }
+ return i
+}
+
+func parseFloat(s string) float64 {
+ f, err := strconv.ParseFloat(s, 64)
+ if err != nil {
+ panic(fmt.Sprintf("invalid float %s: %v", s, err))
+ }
+ return f
+}
+
+func parseLogLevel(level string) slog.Level {
+ switch strings.ToLower(level) {
+ case "debug":
+ return slog.LevelDebug
+ case "warn", "warning":
+ return slog.LevelWarn
+ case "error":
+ return slog.LevelError
+ default:
+ return slog.LevelInfo
+ }
+}
+
+/* ============================================================
+ Watermark persistence
+ ============================================================ */
+
+func loadWatermark(filename string) time.Time {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ // If file doesn't exist, start from 24 hours ago
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
+ if err != nil {
+ return time.Now().UTC().Add(-24 * time.Hour)
+ }
+ return time.Unix(sec, 0).UTC()
+}
+
+func storeWatermark(filename string, t time.Time) error {
+ data := []byte(fmt.Sprintf("%d", t.Unix()))
+ return os.WriteFile(filename, data, 0644)
+}
+
+/* ============================================================
+ Retry helper
+ ============================================================ */
+
+func withRetry(app *App, name string, maxRetries int, fn func() error) error {
+ var lastErr error
+ for i := 0; i < maxRetries; i++ {
+ if err := fn(); err != nil {
+ lastErr = err
+ app.logger.Warn("Operation failed, retrying",
+ "operation", name,
+ "attempt", i+1,
+ "max_attempts", maxRetries,
+ "error", err)
+
+ if i < maxRetries-1 {
+ select {
+ case <-time.After(app.config.RetryDelay):
+ continue
+ case <-app.shutdownCtx.Done():
+ return app.shutdownCtx.Err()
+ }
+ }
+ } else {
+ return nil
+ }
+ }
+ return fmt.Errorf("%s failed after %d attempts: %w", name, maxRetries, lastErr)
+}
+
+/* ============================================================
+ HTTP helpers with retry
+ ============================================================ */
+
+func (app *App) fetchWithRetry(url string) (io.ReadCloser, error) {
+ var body io.ReadCloser
+ err := withRetry(app, "HTTP fetch", app.config.MaxRetries, func() error {
+ req, err := http.NewRequestWithContext(app.shutdownCtx, http.MethodGet, url, nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Accept", "application/xml")
+ req.Header.Set("User-Agent", "FMI-Weather-Poller/1.0")
+
+ resp, err := app.httpClient.Do(req)
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ resp.Body.Close()
+ return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
+ }
+
+ body = resp.Body
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+ return body, nil
+}
+
+/* ============================================================
+ Observation polling
+ ============================================================ */
+
+func (app *App) fetchObservations(start, end time.Time) (io.ReadCloser, error) {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::observations::weather::timevaluepair")
+ q.Set("fmisid", fmt.Sprintf("%d", app.config.StationID))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+ q.Set("timestep", fmt.Sprintf("%d", app.config.ObsTimestep))
+
+ url := app.config.FMIEndpoint + "?" + q.Encode()
+ app.logger.Debug("Fetching observations", "url", url)
+ return app.fetchWithRetry(url)
+}
+
+func (app *App) parseObservations(r io.Reader, minTime time.Time) ([]Observation, time.Time, error) {
+ dec := xml.NewDecoder(r)
+
+ var (
+ param string
+ ts time.Time
+ maxObserved = minTime
+ out []Observation
+ parseErr error
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, maxObserved, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ param = normalizeParameter(a.Value)
+ }
+ }
+ case "time":
+ var s string
+ if err := dec.DecodeElement(&s, &el); err != nil {
+ parseErr = fmt.Errorf("failed to decode time: %w", err)
+ continue
+ }
+ ts, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
+ if err != nil {
+ parseErr = fmt.Errorf("failed to parse time %s: %w", s, err)
+ continue
+ }
+ case "value":
+ var v float64
+ if err := dec.DecodeElement(&v, &el); err != nil {
+ parseErr = fmt.Errorf("failed to decode value: %w", err)
+ continue
+ }
+ if ts.After(minTime) {
+ out = append(out, Observation{
+ Station: -1, // Will be set by caller
+ Parameter: param,
+ Time: ts,
+ Value: v,
+ })
+ if ts.After(maxObserved) {
+ maxObserved = ts
+ }
+ }
+ }
+ }
+ }
+
+ if parseErr != nil {
+ app.logger.Warn("Partial parse errors", "error", parseErr)
+ }
+
+ return out, maxObserved, nil
+}
+
+/* ============================================================
+ Forecast polling
+ ============================================================ */
+
+func (app *App) fetchForecast(start, end time.Time) (io.ReadCloser, error) {
+ q := url.Values{}
+ q.Set("service", "WFS")
+ q.Set("version", "2.0.0")
+ q.Set("request", "GetFeature")
+ q.Set("storedquery_id", "fmi::forecast::harmonie::surface::point::simple")
+ q.Set("latlon", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon))
+ q.Set("starttime", start.Format(time.RFC3339))
+ q.Set("endtime", end.Format(time.RFC3339))
+
+ url := app.config.FMIEndpoint + "?" + q.Encode()
+ app.logger.Debug("Fetching forecast", "url", url)
+ return app.fetchWithRetry(url)
+}
+
+func (app *App) parseForecast(r io.Reader) ([]ForecastValue, time.Time, error) {
+ dec := xml.NewDecoder(r)
+
+ var (
+ param string
+ runTime time.Time
+ fcTime time.Time
+ out []ForecastValue
+ parseErr error
+ )
+
+ for {
+ tok, err := dec.Token()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, runTime, err
+ }
+
+ switch el := tok.(type) {
+ case xml.StartElement:
+ switch el.Name.Local {
+ case "resultTime":
+ var s string
+ if err := dec.DecodeElement(&s, &el); err != nil {
+ parseErr = fmt.Errorf("failed to decode resultTime: %w", err)
+ continue
+ }
+ runTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
+ if err != nil {
+ parseErr = fmt.Errorf("failed to parse resultTime %s: %w", s, err)
+ continue
+ }
+ case "MeasurementTimeseries":
+ for _, a := range el.Attr {
+ if a.Name.Local == "id" {
+ param = normalizeParameter(a.Value)
+ }
+ }
+ case "time":
+ var s string
+ if err := dec.DecodeElement(&s, &el); err != nil {
+ parseErr = fmt.Errorf("failed to decode forecast time: %w", err)
+ continue
+ }
+ fcTime, err = time.Parse(time.RFC3339, strings.TrimSpace(s))
+ if err != nil {
+ parseErr = fmt.Errorf("failed to parse forecast time %s: %w", s, err)
+ continue
+ }
+ case "value":
+ var v float64
+ if err := dec.DecodeElement(&v, &el); err != nil {
+ parseErr = fmt.Errorf("failed to decode forecast value: %w", err)
+ continue
+ }
+ out = append(out, ForecastValue{
+ Model: "harmonie",
+ RunTime: runTime,
+ ForecastTime: fcTime,
+ Parameter: param,
+ Value: v,
+ })
+ }
+ }
+ }
+
+ if parseErr != nil {
+ app.logger.Warn("Partial parse errors in forecast", "error", parseErr)
+ }
+
+ return out, runTime, nil
+}
+
+/* ============================================================
+ Polling workers
+ ============================================================ */
+
+func (app *App) runObservationPoller() {
+ defer app.wg.Done()
+
+ app.logger.Info("Starting observation poller",
+ "poll_interval", app.config.ObsPollEvery,
+ "station", app.config.StationID)
+
+ ticker := time.NewTicker(app.config.ObsPollEvery)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-app.shutdownCtx.Done():
+ app.logger.Info("Observation poller stopping")
+ return
+ case <-ticker.C:
+ app.pollObservations()
+ }
+ }
+}
+
+func (app *App) pollObservations() {
+ startTime := time.Now()
+ lastObs := app.lastObsWatermark.Load().(time.Time)
+ obsEnd := time.Now().UTC().Add(-app.config.ObsSafetyLag)
+
+ if !obsEnd.After(lastObs) {
+ return // Nothing new to fetch
+ }
+
+ app.logger.Info("Polling observations",
+ "start", lastObs,
+ "end", obsEnd)
+
+ body, err := app.fetchObservations(lastObs, obsEnd)
+ if err != nil {
+ app.logger.Error("Failed to fetch observations", "error", err)
+ return
+ }
+ defer body.Close()
+
+ obs, maxT, err := app.parseObservations(body, lastObs)
+ if err != nil {
+ app.logger.Error("Failed to parse observations", "error", err)
+ return
+ }
+
+ // Set station ID for all observations
+ for i := range obs {
+ obs[i].Station = app.config.StationID
+ }
+
+ if len(obs) > 0 {
+ if err := app.publishObs(obs); err != nil {
+ app.logger.Error("Failed to publish observations", "error", err)
+ return
+ }
+
+ app.lastObsWatermark.Store(maxT)
+ if err := storeWatermark(app.config.WatermarkFile, maxT); err != nil {
+ app.logger.Error("Failed to store watermark", "error", err)
+ }
+
+ app.logger.Info("Published observations",
+ "count", len(obs),
+ "new_watermark", maxT,
+ "duration", time.Since(startTime))
+ } else {
+ app.logger.Debug("No new observations")
+ }
+}
+
+func (app *App) runForecastPoller() {
+ defer app.wg.Done()
+
+ app.logger.Info("Starting forecast poller",
+ "poll_interval", app.config.FcPollEvery,
+ "location", fmt.Sprintf("%.4f,%.4f", app.config.HelLat, app.config.HelLon))
+
+ ticker := time.NewTicker(app.config.FcPollEvery)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-app.shutdownCtx.Done():
+ app.logger.Info("Forecast poller stopping")
+ return
+ case <-ticker.C:
+ app.pollForecast()
+ }
+ }
+}
+
+func (app *App) pollForecast() {
+ startTime := time.Now()
+ now := time.Now().UTC()
+ start := now
+ end := now.Add(48 * time.Hour)
+
+ app.logger.Info("Polling forecast",
+ "start", start,
+ "end", end)
+
+ body, err := app.fetchForecast(start, end)
+ if err != nil {
+ app.logger.Error("Failed to fetch forecast", "error", err)
+ return
+ }
+ defer body.Close()
+
+ fc, runTime, err := app.parseForecast(body)
+ if err != nil {
+ app.logger.Error("Failed to parse forecast", "error", err)
+ return
+ }
+
+ // Set location for all forecasts
+ for i := range fc {
+ fc[i].Location.Lat = app.config.HelLat
+ fc[i].Location.Lon = app.config.HelLon
+ }
+
+ if len(fc) > 0 {
+ // Check if this is a new forecast run
+ lastRun := app.lastForecastRun.Load().(time.Time)
+ if runTime.After(lastRun) {
+ if err := app.publishForecast(fc); err != nil {
+ app.logger.Error("Failed to publish forecast", "error", err)
+ return
+ }
+
+ app.lastForecastRun.Store(runTime)
+ app.logger.Info("Published forecast",
+ "count", len(fc),
+ "run_time", runTime,
+ "duration", time.Since(startTime))
+ } else {
+ app.logger.Debug("Forecast run already published", "run_time", runTime)
+ }
+ }
+}
+
+/* ============================================================
+ MQTT publishing with error handling
+ ============================================================ */
+
+func (app *App) publishObs(obs []Observation) error {
+ for _, o := range obs {
+ topic := fmt.Sprintf("weather/obs/fmi/%d/%s", o.Station, o.Parameter)
+ b, err := json.Marshal(o)
+ if err != nil {
+ return fmt.Errorf("failed to marshal observation: %w", err)
+ }
+
+ token := app.mqttClient.Publish(topic, 1, false, b)
+ if !token.WaitTimeout(app.config.HTTPTimeout) {
+ return errors.New("MQTT publish timeout")
+ }
+ if err := token.Error(); err != nil {
+ return fmt.Errorf("failed to publish observation: %w", err)
+ }
+ }
+ return nil
+}
+
+func (app *App) publishForecast(fc []ForecastValue) error {
+ for _, v := range fc {
+ topic := fmt.Sprintf(
+ "weather/forecast/fmi/harmonie/helsinki/run=%s/%s",
+ v.RunTime.Format(time.RFC3339),
+ v.Parameter,
+ )
+ b, err := json.Marshal(v)
+ if err != nil {
+ return fmt.Errorf("failed to marshal forecast: %w", err)
+ }
+
+ token := app.mqttClient.Publish(topic, 1, true, b)
+ if !token.WaitTimeout(app.config.HTTPTimeout) {
+ return errors.New("MQTT publish timeout")
+ }
+ if err := token.Error(); err != nil {
+ return fmt.Errorf("failed to publish forecast: %w", err)
+ }
+ }
+ return nil
+}
+
+/* ============================================================
+ Utilities
+ ============================================================ */
+
+func normalizeParameter(id string) string {
+ return strings.TrimPrefix(strings.ToLower(id), "ts_")
+}
+
+/* ============================================================
+ Main
+ ============================================================ */
+
+func main() {
+ // Create and initialize app
+ app, err := NewApp()
+ if err != nil {
+ panic(fmt.Sprintf("Failed to initialize app: %v", err))
+ }
+ defer app.shutdownCancel()
+
+ // Setup signal handling
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+
+ // Connect to MQTT
+ if err := app.ConnectMQTT(); err != nil {
+ app.logger.Error("Failed to connect to MQTT", "error", err)
+ os.Exit(1)
+ }
+ defer app.mqttClient.Disconnect(250)
+
+ // Start pollers
+ app.wg.Add(2)
+ go app.runObservationPoller()
+ go app.runForecastPoller()
+
+ app.logger.Info("Weather poller started")
+
+ // Wait for shutdown signal
+ select {
+ case sig := <-signalChan:
+ app.logger.Info("Received shutdown signal", "signal", sig)
+ app.shutdownCancel()
+ case <-app.shutdownCtx.Done():
+ app.logger.Info("Shutdown initiated")
+ }
+
+ // Wait for graceful shutdown
+ shutdownDone := make(chan struct{})
+ go func() {
+ app.wg.Wait()
+ close(shutdownDone)
+ }()
+
+ select {
+ case <-shutdownDone:
+ app.logger.Info("Shutdown completed")
+ case <-time.After(30 * time.Second):
+ app.logger.Warn("Shutdown timed out")
+ }
+}