aboutsummaryrefslogtreecommitdiffstats
path: root/internal/weather/mqtt.go
diff options
context:
space:
mode:
authorPetri Hienonen <petri.hienonen@gmail.com>2026-01-04 11:52:47 +0200
committerPetri Hienonen <petri.hienonen@gmail.com>2026-01-04 11:52:47 +0200
commit59491201976316a30ffc475dd99b0af02b5e997d (patch)
treeecf395594d5d289d855eba16f786e0fb66c1d814 /internal/weather/mqtt.go
parent4e0ca0509c6b314eea8a7b2df6d093f5d9b7e70f (diff)
downloadpub-59491201976316a30ffc475dd99b0af02b5e997d.tar.zst
Both publisher and subscriber
Diffstat (limited to 'internal/weather/mqtt.go')
-rw-r--r--internal/weather/mqtt.go76
1 files changed, 76 insertions, 0 deletions
diff --git a/internal/weather/mqtt.go b/internal/weather/mqtt.go
new file mode 100644
index 0000000..211b441
--- /dev/null
+++ b/internal/weather/mqtt.go
@@ -0,0 +1,76 @@
+package weather
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "net/url"
+ "os"
+ "time"
+
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+)
+
+// MQTTConfig holds MQTT connection configuration
+type MQTTConfig struct {
+ Broker string
+ ClientID string
+ Username string
+ Password string
+ KeepAlive time.Duration
+ SessionExpiry time.Duration
+ Timeout time.Duration
+ OnPublishReceived []func(paho.PublishReceived) (bool, error)
+}
+
+// CreateMQTTClient creates a MQTT client with the given configuration
+func CreateMQTTClient(ctx context.Context, cfg MQTTConfig,
+ onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) {
+
+ serverURL, err := url.Parse(cfg.Broker)
+ if err != nil {
+ return nil, fmt.Errorf("invalid MQTT broker URL: %w", err)
+ }
+
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{serverURL},
+ KeepAlive: uint16(cfg.KeepAlive.Seconds()),
+ ClientConfig: paho.ClientConfig{
+ ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()),
+ OnPublishReceived: cfg.OnPublishReceived,
+ OnServerDisconnect: func(d *paho.Disconnect) {
+
+ // Default disconnect handler
+ },
+ },
+ ConnectUsername: cfg.Username,
+ ConnectPassword: []byte(cfg.Password),
+ ConnectTimeout: cfg.Timeout,
+ }
+
+ cliCfg.CleanStartOnInitialConnection = false
+ cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds())
+
+ if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" {
+ cliCfg.TlsCfg = &tls.Config{
+ MinVersion: tls.VersionTLS12,
+ }
+ }
+
+ if onConnect != nil {
+ cliCfg.OnConnectionUp = onConnect
+ }
+
+ cm, err := autopaho.NewConnection(ctx, cliCfg)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create MQTT connection: %w", err)
+ }
+
+ err = cm.AwaitConnection(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to establish MQTT connection: %w", err)
+ }
+
+ return cm, nil
+}