diff options
Diffstat (limited to 'cmd/subscriber')
| -rw-r--r-- | cmd/subscriber/main.go | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/cmd/subscriber/main.go b/cmd/subscriber/main.go new file mode 100644 index 0000000..f561d60 --- /dev/null +++ b/cmd/subscriber/main.go @@ -0,0 +1,310 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/url" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/joho/godotenv" + "hub/internal/weather" +) + +// MessageHandler wraps the function to process incoming messages +type MessageHandler func(*paho.Publish) + +func main() { + // Load environment variables + _ = godotenv.Load() + + // Load configuration + cfg := weather.LoadSubscriberConfig() + + // Setup logging + var logLevel slog.Level + switch strings.ToLower(cfg.LogLevel) { + case "debug": + logLevel = slog.LevelDebug + case "warn", "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + default: + logLevel = slog.LevelInfo + } + + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })) + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Info("Received signal", "signal", sig) + cancel() + }() + + // Run the subscriber + logger.Info("Starting MQTT v5 subscriber for FMI weather data", + "broker", cfg.MQTTBroker, + "topics", cfg.Topics, + "qos", cfg.QoS) + + if err := runSubscriber(ctx, cfg, logger); err != nil { + logger.Error("Subscriber failed", "error", err) + os.Exit(1) + } + + logger.Info("Subscriber stopped gracefully") +} + +func runSubscriber(ctx context.Context, cfg *weather.SubscriberConfig, logger *slog.Logger) error { + // Initialize statistics + stats := weather.InitTopicStats() + statsMutex := &sync.RWMutex{} + + // Create MQTT client with custom configuration + serverURL, err := url.Parse(cfg.MQTTBroker) + if err != nil { + return fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + // Create message handler + messageHandler := func(pr paho.PublishReceived) (bool, error) { + go handleMessage(pr.Packet, logger, stats, statsMutex) + return true, nil + } + + // Client configuration + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()), + OnPublishReceived: []func(paho.PublishReceived) (bool, error){ + messageHandler, + }, + OnServerDisconnect: func(d *paho.Disconnect) { + if d != nil && d.ReasonCode != 0 { + logger.Warn("MQTT disconnected", "reason", d.ReasonCode) + } else { + logger.Info("MQTT disconnected gracefully") + } + }, + }, + ConnectUsername: cfg.MQTTUsername, + ConnectPassword: []byte(cfg.MQTTPassword), + ConnectTimeout: 30 * time.Second, + } + + cliCfg.CleanStartOnInitialConnection = false + cliCfg.SessionExpiryInterval = uint32(cfg.MQTTSessionExp.Seconds()) + + // Connection callbacks + cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + logger.Info("MQTT v5 connected", + "session_present", connAck.SessionPresent, + "keep_alive", cfg.MQTTKeepAlive) + + // Subscribe to topics after connection is established + if len(cfg.Topics) > 0 { + subscriptions := make([]paho.SubscribeOptions, len(cfg.Topics)) + for i, topic := range cfg.Topics { + subscriptions[i] = paho.SubscribeOptions{ + Topic: topic, + QoS: byte(cfg.QoS), + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // FIXED: SubscriptionIdentifier is a pointer + subID := int(1) + subProps := &paho.SubscribeProperties{ + SubscriptionIdentifier: &subID, + } + + if _, err := cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: subscriptions, + Properties: subProps, + }); err != nil { + logger.Error("Failed to subscribe", "error", err) + } else { + logger.Info("Subscribed to topics", + "topics", cfg.Topics, + "qos", cfg.QoS) + } + } + } + + cliCfg.OnConnectError = func(err error) { + logger.Error("MQTT connection error", "error", err) + } + + // Connect to broker + mqttClient, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return fmt.Errorf("failed to create MQTT connection: %w", err) + } + + // Wait for connection + err = mqttClient.AwaitConnection(ctx) + if err != nil { + return fmt.Errorf("failed to establish MQTT connection: %w", err) + } + + defer func() { + discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := mqttClient.Disconnect(discCtx); err != nil { + logger.Error("Failed to disconnect MQTT", "error", err) + } + }() + + logger.Info("Subscriber started and waiting for messages...") + + // Statistics reporting goroutine + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Print final statistics + statsMutex.RLock() + finalStats := stats.MessagesReceived + statsMutex.RUnlock() + + if len(finalStats) > 0 { + logger.Info("Final message statistics", "stats", finalStats) + } + return nil + + case <-ticker.C: + if cfg.EnableDebug { + statsMutex.RLock() + currentStats := stats.MessagesReceived + statsMutex.RUnlock() + + if len(currentStats) > 0 { + logger.Debug("Message statistics", "stats", currentStats) + } + } + } + } +} + +func handleMessage(pkt *paho.Publish, logger *slog.Logger, + stats *weather.TopicStats, mutex *sync.RWMutex) { + + // Update statistics + mutex.Lock() + stats.MessagesReceived[pkt.Topic]++ + stats.LastMessageTime[pkt.Topic] = time.Now() + mutex.Unlock() + + // Route based on topic pattern + switch { + case strings.HasPrefix(pkt.Topic, "weather/obs/"): + processObservationMessage(pkt, logger) + case strings.HasPrefix(pkt.Topic, "weather/forecast/"): + processForecastMessage(pkt, logger) + default: + logger.Warn("Unknown topic pattern", "topic", pkt.Topic) + // Try to decode as generic JSON + var data map[string]interface{} + if err := json.Unmarshal(pkt.Payload, &data); err == nil { + logger.Info("Generic message", "topic", pkt.Topic, "data", data) + } else { + logger.Info("Raw message", "topic", pkt.Topic, "payload", string(pkt.Payload)) + } + } +} + +func processObservationMessage(pkt *paho.Publish, logger *slog.Logger) { + var obs weather.Observation + if err := json.Unmarshal(pkt.Payload, &obs); err != nil { + logger.Error("Failed to parse observation", + "error", err, + "topic", pkt.Topic) + return + } + + // Extract user properties + userProps := make(map[string]string) + if pkt.Properties != nil { + for _, prop := range pkt.Properties.User { + userProps[prop.Key] = prop.Value + } + } + + // Format value for display + valueStr := "null" + if obs.Value != nil { + valueStr = fmt.Sprintf("%.2f", *obs.Value) + } + + logger.Info("Observation received", + "topic", pkt.Topic, + "station", obs.Station, + "parameter", obs.Parameter, + "time", obs.Time.Format(time.RFC3339), + "value", valueStr, + "user_properties", userProps, + "retain", pkt.Retain, + "qos", pkt.QoS) +} + +func processForecastMessage(pkt *paho.Publish, logger *slog.Logger) { + var fc weather.ForecastValue + if err := json.Unmarshal(pkt.Payload, &fc); err != nil { + logger.Error("Failed to parse forecast", + "error", err, + "topic", pkt.Topic) + return + } + + // Extract user properties + userProps := make(map[string]string) + if pkt.Properties != nil { + for _, prop := range pkt.Properties.User { + userProps[prop.Key] = prop.Value + } + } + + // Format value for display + valueStr := "null" + if fc.Value != nil { + valueStr = fmt.Sprintf("%.2f", *fc.Value) + } + + logger.Info("Forecast received", + "topic", pkt.Topic, + "model", fc.Model, + "parameter", fc.Parameter, + "run_time", fc.RunTime.Format(time.RFC3339), + "forecast_time", fc.ForecastTime.Format(time.RFC3339), + "location", fmt.Sprintf("(%.4f,%.4f)", fc.Location.Lat, fc.Location.Lon), + "value", valueStr, + "user_properties", userProps, + "retain", pkt.Retain, + "qos", pkt.QoS) +} |
