package weather import ( "context" "crypto/tls" "fmt" "net/url" "os" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" ) // MQTTConfig holds MQTT connection configuration type MQTTConfig struct { Broker string ClientID string Username string Password string KeepAlive time.Duration SessionExpiry time.Duration Timeout time.Duration OnPublishReceived []func(paho.PublishReceived) (bool, error) } // CreateMQTTClient creates a MQTT client with the given configuration func CreateMQTTClient(ctx context.Context, cfg MQTTConfig, onConnect func(*autopaho.ConnectionManager, *paho.Connack)) (*autopaho.ConnectionManager, error) { serverURL, err := url.Parse(cfg.Broker) if err != nil { return nil, fmt.Errorf("invalid MQTT broker URL: %w", err) } cliCfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, KeepAlive: uint16(cfg.KeepAlive.Seconds()), ClientConfig: paho.ClientConfig{ ClientID: fmt.Sprintf("%s-%d", cfg.ClientID, os.Getpid()), OnPublishReceived: cfg.OnPublishReceived, OnServerDisconnect: func(d *paho.Disconnect) { // Default disconnect handler }, }, ConnectUsername: cfg.Username, ConnectPassword: []byte(cfg.Password), ConnectTimeout: cfg.Timeout, } cliCfg.CleanStartOnInitialConnection = false cliCfg.SessionExpiryInterval = uint32(cfg.SessionExpiry.Seconds()) if serverURL.Scheme == "ssl" || serverURL.Scheme == "tls" { cliCfg.TlsCfg = &tls.Config{ MinVersion: tls.VersionTLS12, } } if onConnect != nil { cliCfg.OnConnectionUp = onConnect } cm, err := autopaho.NewConnection(ctx, cliCfg) if err != nil { return nil, fmt.Errorf("failed to create MQTT connection: %w", err) } err = cm.AwaitConnection(ctx) if err != nil { return nil, fmt.Errorf("failed to establish MQTT connection: %w", err) } return cm, nil }