package main import ( "context" "encoding/json" "fmt" "log/slog" "net/url" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/joho/godotenv" "hub/internal/weather" ) // MessageHandler wraps the function to process incoming messages type MessageHandler func(*paho.Publish) func main() { // Load environment variables _ = godotenv.Load() // Load configuration cfg := weather.LoadSubscriberConfig() // Setup logging var logLevel slog.Level switch strings.ToLower(cfg.LogLevel) { case "debug": logLevel = slog.LevelDebug case "warn", "warning": logLevel = slog.LevelWarn case "error": logLevel = slog.LevelError default: logLevel = slog.LevelInfo } logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: logLevel, })) // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Handle signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigChan logger.Info("Received signal", "signal", sig) cancel() }() // Run the subscriber logger.Info("Starting MQTT v5 subscriber for FMI weather data", "broker", cfg.MQTTBroker, "topics", cfg.Topics, "qos", cfg.QoS) if err := runSubscriber(ctx, cfg, logger); err != nil { logger.Error("Subscriber failed", "error", err) os.Exit(1) } logger.Info("Subscriber stopped gracefully") } func runSubscriber(ctx context.Context, cfg *weather.SubscriberConfig, logger *slog.Logger) error { // Initialize statistics stats := weather.InitTopicStats() statsMutex := &sync.RWMutex{} // Create MQTT client with custom configuration serverURL, err := url.Parse(cfg.MQTTBroker) if err != nil { return fmt.Errorf("invalid MQTT broker URL: %w", err) } // Create message handler messageHandler := func(pr paho.PublishReceived) (bool, error) { go handleMessage(pr.Packet, logger, stats, statsMutex) return true, nil } // Client configuration cliCfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), ClientConfig: paho.ClientConfig{ ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), OnPublishReceived: []func(paho.PublishReceived) (bool, error){ messageHandler, }, OnServerDisconnect: func(d *paho.Disconnect) { if d != nil && d.ReasonCode != 0 { logger.Warn("MQTT disconnected", "reason", d.ReasonCode) } else { logger.Info("MQTT disconnected gracefully") } }, }, ConnectUsername: cfg.MQTTUsername, ConnectPassword: []byte(cfg.MQTTPassword), ConnectTimeout: 30 * time.Second, } cliCfg.CleanStartOnInitialConnection = false cliCfg.SessionExpiryInterval = uint32(cfg.MQTTSessionExp.Seconds()) // Connection callbacks cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { logger.Info("MQTT v5 connected", "session_present", connAck.SessionPresent, "keep_alive", cfg.MQTTKeepAlive) // Subscribe to topics after connection is established if len(cfg.Topics) > 0 { subscriptions := make([]paho.SubscribeOptions, len(cfg.Topics)) for i, topic := range cfg.Topics { subscriptions[i] = paho.SubscribeOptions{ Topic: topic, QoS: byte(cfg.QoS), } } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // FIXED: SubscriptionIdentifier is a pointer subID := int(1) subProps := &paho.SubscribeProperties{ SubscriptionIdentifier: &subID, } if _, err := cm.Subscribe(ctx, &paho.Subscribe{ Subscriptions: subscriptions, Properties: subProps, }); err != nil { logger.Error("Failed to subscribe", "error", err) } else { logger.Info("Subscribed to topics", "topics", cfg.Topics, "qos", cfg.QoS) } } } cliCfg.OnConnectError = func(err error) { logger.Error("MQTT connection error", "error", err) } // Connect to broker mqttClient, err := autopaho.NewConnection(ctx, cliCfg) if err != nil { return fmt.Errorf("failed to create MQTT connection: %w", err) } // Wait for connection err = mqttClient.AwaitConnection(ctx) if err != nil { return fmt.Errorf("failed to establish MQTT connection: %w", err) } defer func() { discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := mqttClient.Disconnect(discCtx); err != nil { logger.Error("Failed to disconnect MQTT", "error", err) } }() logger.Info("Subscriber started and waiting for messages...") // Statistics reporting goroutine ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): // Print final statistics statsMutex.RLock() finalStats := stats.MessagesReceived statsMutex.RUnlock() if len(finalStats) > 0 { logger.Info("Final message statistics", "stats", finalStats) } return nil case <-ticker.C: if cfg.EnableDebug { statsMutex.RLock() currentStats := stats.MessagesReceived statsMutex.RUnlock() if len(currentStats) > 0 { logger.Debug("Message statistics", "stats", currentStats) } } } } } func handleMessage(pkt *paho.Publish, logger *slog.Logger, stats *weather.TopicStats, mutex *sync.RWMutex) { // Update statistics mutex.Lock() stats.MessagesReceived[pkt.Topic]++ stats.LastMessageTime[pkt.Topic] = time.Now() mutex.Unlock() // Route based on topic pattern switch { case strings.HasPrefix(pkt.Topic, "weather/obs/"): processObservationMessage(pkt, logger) case strings.HasPrefix(pkt.Topic, "weather/forecast/"): processForecastMessage(pkt, logger) default: logger.Warn("Unknown topic pattern", "topic", pkt.Topic) // Try to decode as generic JSON var data map[string]interface{} if err := json.Unmarshal(pkt.Payload, &data); err == nil { logger.Info("Generic message", "topic", pkt.Topic, "data", data) } else { logger.Info("Raw message", "topic", pkt.Topic, "payload", string(pkt.Payload)) } } } func processObservationMessage(pkt *paho.Publish, logger *slog.Logger) { var obs weather.Observation if err := json.Unmarshal(pkt.Payload, &obs); err != nil { logger.Error("Failed to parse observation", "error", err, "topic", pkt.Topic) return } // Extract user properties userProps := make(map[string]string) if pkt.Properties != nil { for _, prop := range pkt.Properties.User { userProps[prop.Key] = prop.Value } } // Format value for display valueStr := "null" if obs.Value != nil { valueStr = fmt.Sprintf("%.2f", *obs.Value) } logger.Info("Observation received", "topic", pkt.Topic, "station", obs.Station, "parameter", obs.Parameter, "time", obs.Time.Format(time.RFC3339), "value", valueStr, "user_properties", userProps, "retain", pkt.Retain, "qos", pkt.QoS) } func processForecastMessage(pkt *paho.Publish, logger *slog.Logger) { var fc weather.ForecastValue if err := json.Unmarshal(pkt.Payload, &fc); err != nil { logger.Error("Failed to parse forecast", "error", err, "topic", pkt.Topic) return } // Extract user properties userProps := make(map[string]string) if pkt.Properties != nil { for _, prop := range pkt.Properties.User { userProps[prop.Key] = prop.Value } } // Format value for display valueStr := "null" if fc.Value != nil { valueStr = fmt.Sprintf("%.2f", *fc.Value) } logger.Info("Forecast received", "topic", pkt.Topic, "model", fc.Model, "parameter", fc.Parameter, "run_time", fc.RunTime.Format(time.RFC3339), "forecast_time", fc.ForecastTime.Format(time.RFC3339), "location", fmt.Sprintf("(%.4f,%.4f)", fc.Location.Lat, fc.Location.Lon), "value", valueStr, "user_properties", userProps, "retain", pkt.Retain, "qos", pkt.QoS) }