diff options
Diffstat (limited to 'internal/weather/mqtt.go')
| -rw-r--r-- | internal/weather/mqtt.go | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go new file mode 100644 index 0000000..211b441 --- /dev/null +++ b/internal/weather/mqtt.go @@ -0,0 +1,76 @@ +package weather + +import ( + "context" + "crypto/tls" + "fmt" + "net/url" + "os" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" +) + +// MQTTConfig holds MQTT connection configuration +type MQTTConfig struct { + Broker string + ClientID string + Username string + Password string + KeepAlive time.Duration + SessionExpiry time.Duration + Timeout time.Duration + OnPublishReceived []func(paho.PublishReceived) (bool, error) +} + +// CreateMQTTClient creates a MQTT client with the given configuration +func CreateMQTTClient(ctx context.Context, cfg MQTTConfig, + onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) { + + serverURL, err := url.Parse(cfg.Broker) + if err != nil { + return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) + } + + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{serverURL}, + KeepAlive: uint16(cfg.KeepAlive.Seconds()), + ClientConfig: paho.ClientConfig{ + ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()), + OnPublishReceived: cfg.OnPublishReceived, + OnServerDisconnect: func(d *paho.Disconnect) { + + // Default disconnect handler + }, + }, + ConnectUsername: cfg.Username, + ConnectPassword: []byte(cfg.Password), + ConnectTimeout: cfg.Timeout, + } + + cliCfg.CleanStartOnInitialConnection = false + cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds()) + + if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { + cliCfg.TlsCfg = &tls.Config{ + MinVersion: tls.VersionTLS12, + } + } + + if onConnect != nil { + cliCfg.OnConnectionUp = onConnect + } + + cm, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + return nil, fmt.Errorf("failed to create MQTT connection: %w", err) + } + + err = cm.AwaitConnection(ctx) + if err != nil { + return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) + } + + return cm, nil +} |
