go实现mqtt订阅与推送
搭建mqtt服务端
安装emqx
本文安装emqx
作为服务端,请自行安装docker
1 2 3 4 5
| # 拉取emqx最新镜像 docker pull emqx
# 创建并运行emqx容器 docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx
|
浏览器打开http://ip:18083
,账号默认为admin
,密码默认为public
安装mqttx
有需要可以选择安装mqttx
1 2 3 4 5
| # 拉取mqttx镜像 docker pull emqx/mqttx-web
# 创建并运行mqttx容器 docker run -d --name mqttx-web -p 18830:80 emqx/mqttx-web
|
浏览器打开http://ip:18830
go创建mqtt客户端
mqtt配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13
| mqtt: broker: 127.0.0.1:1883 username: admin password: public clientID: go_mqtt_client timeOut: 60 topicName: testTopic
|
加载mqtt配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| type MqttConfig struct { Broker string `yaml:"mqtt.broker"` UserName string `yaml:"mqtt.username"` Password string `yaml:"mqtt.password"` ClientID string `yaml:"mqtt.clientID"` TimeOut int64 `yaml:"mqtt.timeOut"` TopicName string `yaml:"mqtt.topicName"` }
var mqttConfig *MqttConfig
func loadMqttConfig() *MqttConfig { mqttConfig := MqttConfig{ Broker: viper.GetString("mqtt.broker"), UserName: viper.GetString("mqtt.username"), Password: viper.GetString("mqtt.password"), ClientID: viper.GetString("mqtt.clientID"), TimeOut: viper.GetInt64("mqtt.timeOut"), TopicName: viper.GetString("mqtt.topicName"), } return &mqttConfig }
func init() { mqttConfig = loadMqttConfig() }
func GetMqttConfig() *MqttConfig { return mqttConfig }
|
设置mqtt客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| var client mqtt.Client
func SetMqttClient(mqttConfig *config.MqttConfig) { clientOptions := mqtt.NewClientOptions().AddBroker(mqttConfig.Broker).SetUsername(mqttConfig.UserName).SetPassword(mqttConfig.Password) clientOptions.SetClientID(mqttConfig.ClientID) clientOptions.SetConnectTimeout(time.Duration(mqttConfig.TimeOut) * time.Second) client = mqtt.NewClient(clientOptions) }
func Connect(timeOut int64) { if token := client.Connect(); token.WaitTimeout(time.Duration(timeOut) * time.Second) && token.Wait() && token.Error() != nil { panic(token.Error()) } }
func DoSubscribe(topic string, doMessage mqtt.MessageHandler) { for { token := client.Subscribe(topic, 1, doMessage) token.Wait() } }
|
推送数据
1 2 3 4 5 6 7 8
| func DoPublish(client mqtt.Client, topicName string, content interface{}) { for { client.Publish(topicName, 1, false, content) time.Sleep(time.Duration(3) * time.Second) } }
|
运行
1 2 3 4 5 6 7 8 9 10 11 12 13
| func main() { mqttConfig := config.GetMqttConfig() MqttUtil.SetMqttClient(mqttConfig) go MqttUtil.DoPublish(mqttConfig.TopicName, "testPublish...") MqttUtil.Connect(mqttConfig.TimeOut) MqttUtil.DoSubscribe(mqttConfig.TopicName, func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload()) }) }
|