aboutsummaryrefslogtreecommitdiffstats
path: root/cmd/subscriber
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/subscriber')
-rw-r--r--cmd/subscriber/main.go310
1 files changed, 310 insertions, 0 deletions
diff --git a/cmd/subscriber/main.go b/cmd/subscriber/main.go
new file mode 100644
index 0000000..f561d60
--- /dev/null
+++ b/cmd/subscriber/main.go
@@ -0,0 +1,310 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "net/url"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/joho/godotenv"
+ "hub/internal/weather"
+)
+
+// MessageHandler wraps the function to process incoming messages
+type MessageHandler func(*paho.Publish)
+
+func main() {
+ // Load environment variables
+ _ = godotenv.Load()
+
+ // Load configuration
+ cfg := weather.LoadSubscriberConfig()
+
+ // Setup logging
+ var logLevel slog.Level
+ switch strings.ToLower(cfg.LogLevel) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "warn", "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ default:
+ logLevel = slog.LevelInfo
+ }
+
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
+ Level: logLevel,
+ }))
+
+ // Create context for graceful shutdown
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Handle signals
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ sig := <-sigChan
+ logger.Info("Received signal", "signal", sig)
+ cancel()
+ }()
+
+ // Run the subscriber
+ logger.Info("Starting MQTT v5 subscriber for FMI weather data",
+ "broker", cfg.MQTTBroker,
+ "topics", cfg.Topics,
+ "qos", cfg.QoS)
+
+ if err := runSubscriber(ctx, cfg, logger); err != nil {
+ logger.Error("Subscriber failed", "error", err)
+ os.Exit(1)
+ }
+
+ logger.Info("Subscriber stopped gracefully")
+}
+
+func runSubscriber(ctx context.Context, cfg *weather.SubscriberConfig, logger *slog.Logger) error {
+ // Initialize statistics
+ stats := weather.InitTopicStats()
+ statsMutex := &sync.RWMutex{}
+
+ // Create MQTT client with custom configuration
+ serverURL, err := url.Parse(cfg.MQTTBroker)
+ if err != nil {
+ return fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ // Create message handler
+ messageHandler := func(pr paho.PublishReceived) (bool, error) {
+ go handleMessage(pr.Packet, logger, stats, statsMutex)
+ return true, nil
+ }
+
+ // Client configuration
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.MQTTKeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.MQTTClientID, os.Getpid()),
+ OnPublishReceived: []func(paho.PublishReceived) (bool, error){
+ messageHandler,
+ },
+ OnServerDisconnect: func(d *paho.Disconnect) {
+ if d != nil && d.ReasonCode != 0 {
+ logger.Warn("MQTT disconnected", "reason", d.ReasonCode)
+ } else {
+ logger.Info("MQTT disconnected gracefully")
+ }
+ },
+ },
+ ConnectUsername: cfg.MQTTUsername,
+ ConnectPassword: []byte(cfg.MQTTPassword),
+ ConnectTimeout: 30 * time.Second,
+ }
+
+ cliCfg.CleanStartOnInitialConnection = false
+ cliCfg.SessionExpiryInterval = uint32(cfg.MQTTSessionExp.Seconds())
+
+ // Connection callbacks
+ cliCfg.OnConnectionUp = func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ logger.Info("MQTT v5 connected",
+ "session_present", connAck.SessionPresent,
+ "keep_alive", cfg.MQTTKeepAlive)
+
+ // Subscribe to topics after connection is established
+ if len(cfg.Topics) > 0 {
+ subscriptions := make([]paho.SubscribeOptions, len(cfg.Topics))
+ for i, topic := range cfg.Topics {
+ subscriptions[i] = paho.SubscribeOptions{
+ Topic: topic,
+ QoS: byte(cfg.QoS),
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // FIXED: SubscriptionIdentifier is a pointer
+ subID := int(1)
+ subProps := &paho.SubscribeProperties{
+ SubscriptionIdentifier: &subID,
+ }
+
+ if _, err := cm.Subscribe(ctx, &paho.Subscribe{
+ Subscriptions: subscriptions,
+ Properties: subProps,
+ }); err != nil {
+ logger.Error("Failed to subscribe", "error", err)
+ } else {
+ logger.Info("Subscribed to topics",
+ "topics", cfg.Topics,
+ "qos", cfg.QoS)
+ }
+ }
+ }
+
+ cliCfg.OnConnectError = func(err error) {
+ logger.Error("MQTT connection error", "error", err)
+ }
+
+ // Connect to broker
+ mqttClient, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ // Wait for connection
+ err = mqttClient.AwaitConnection(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to establish MQTT connection: %w", err)
+ }
+
+ defer func() {
+ discCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := mqttClient.Disconnect(discCtx); err != nil {
+ logger.Error("Failed to disconnect MQTT", "error", err)
+ }
+ }()
+
+ logger.Info("Subscriber started and waiting for messages...")
+
+ // Statistics reporting goroutine
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Print final statistics
+ statsMutex.RLock()
+ finalStats := stats.MessagesReceived
+ statsMutex.RUnlock()
+
+ if len(finalStats) > 0 {
+ logger.Info("Final message statistics", "stats", finalStats)
+ }
+ return nil
+
+ case <-ticker.C:
+ if cfg.EnableDebug {
+ statsMutex.RLock()
+ currentStats := stats.MessagesReceived
+ statsMutex.RUnlock()
+
+ if len(currentStats) > 0 {
+ logger.Debug("Message statistics", "stats", currentStats)
+ }
+ }
+ }
+ }
+}
+
+func handleMessage(pkt *paho.Publish, logger *slog.Logger,
+ stats *weather.TopicStats, mutex *sync.RWMutex) {
+
+ // Update statistics
+ mutex.Lock()
+ stats.MessagesReceived[pkt.Topic]++
+ stats.LastMessageTime[pkt.Topic] = time.Now()
+ mutex.Unlock()
+
+ // Route based on topic pattern
+ switch {
+ case strings.HasPrefix(pkt.Topic, "weather/obs/"):
+ processObservationMessage(pkt, logger)
+ case strings.HasPrefix(pkt.Topic, "weather/forecast/"):
+ processForecastMessage(pkt, logger)
+ default:
+ logger.Warn("Unknown topic pattern", "topic", pkt.Topic)
+ // Try to decode as generic JSON
+ var data map[string]interface{}
+ if err := json.Unmarshal(pkt.Payload, &data); err == nil {
+ logger.Info("Generic message", "topic", pkt.Topic, "data", data)
+ } else {
+ logger.Info("Raw message", "topic", pkt.Topic, "payload", string(pkt.Payload))
+ }
+ }
+}
+
+func processObservationMessage(pkt *paho.Publish, logger *slog.Logger) {
+ var obs weather.Observation
+ if err := json.Unmarshal(pkt.Payload, &obs); err != nil {
+ logger.Error("Failed to parse observation",
+ "error", err,
+ "topic", pkt.Topic)
+ return
+ }
+
+ // Extract user properties
+ userProps := make(map[string]string)
+ if pkt.Properties != nil {
+ for _, prop := range pkt.Properties.User {
+ userProps[prop.Key] = prop.Value
+ }
+ }
+
+ // Format value for display
+ valueStr := "null"
+ if obs.Value != nil {
+ valueStr = fmt.Sprintf("%.2f", *obs.Value)
+ }
+
+ logger.Info("Observation received",
+ "topic", pkt.Topic,
+ "station", obs.Station,
+ "parameter", obs.Parameter,
+ "time", obs.Time.Format(time.RFC3339),
+ "value", valueStr,
+ "user_properties", userProps,
+ "retain", pkt.Retain,
+ "qos", pkt.QoS)
+}
+
+func processForecastMessage(pkt *paho.Publish, logger *slog.Logger) {
+ var fc weather.ForecastValue
+ if err := json.Unmarshal(pkt.Payload, &fc); err != nil {
+ logger.Error("Failed to parse forecast",
+ "error", err,
+ "topic", pkt.Topic)
+ return
+ }
+
+ // Extract user properties
+ userProps := make(map[string]string)
+ if pkt.Properties != nil {
+ for _, prop := range pkt.Properties.User {
+ userProps[prop.Key] = prop.Value
+ }
+ }
+
+ // Format value for display
+ valueStr := "null"
+ if fc.Value != nil {
+ valueStr = fmt.Sprintf("%.2f", *fc.Value)
+ }
+
+ logger.Info("Forecast received",
+ "topic", pkt.Topic,
+ "model", fc.Model,
+ "parameter", fc.Parameter,
+ "run_time", fc.RunTime.Format(time.RFC3339),
+ "forecast_time", fc.ForecastTime.Format(time.RFC3339),
+ "location", fmt.Sprintf("(%.4f,%.4f)", fc.Location.Lat, fc.Location.Lon),
+ "value", valueStr,
+ "user_properties", userProps,
+ "retain", pkt.Retain,
+ "qos", pkt.QoS)
+}