go实现mqtt订阅与推送

搭建mqtt服务端

安装emqx

本文安装emqx作为服务端,请自行安装docker

1
2
3
4
5
# 拉取emqx最新镜像
docker pull emqx

# 创建并运行emqx容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx

浏览器打开http://ip:18083,账号默认为admin,密码默认为public

emqx控制台

安装mqttx

有需要可以选择安装mqttx

1
2
3
4
5
# 拉取mqttx镜像
docker pull emqx/mqttx-web

# 创建并运行mqttx容器
docker run -d --name mqttx-web -p 18830:80 emqx/mqttx-web

浏览器打开http://ip:18830

mqttx页面

go创建mqtt客户端

mqtt配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
mqtt:
# 地址
broker: 127.0.0.1:1883
# 账号
username: admin
# 密码
password: public
# 客户端id
clientID: go_mqtt_client
# 超时时间,单位为秒
timeOut: 60
# 订阅主题
topicName: testTopic

加载mqtt配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// mqtt配置文件结构体
type MqttConfig struct {
Broker string `yaml:"mqtt.broker"`
UserName string `yaml:"mqtt.username"`
Password string `yaml:"mqtt.password"`
ClientID string `yaml:"mqtt.clientID"`
TimeOut int64 `yaml:"mqtt.timeOut"`
TopicName string `yaml:"mqtt.topicName"`
}

// mqtt配置文件
var mqttConfig *MqttConfig

// 加载mqtt配置文件
func loadMqttConfig() *MqttConfig {
mqttConfig := MqttConfig{
Broker: viper.GetString("mqtt.broker"),
UserName: viper.GetString("mqtt.username"),
Password: viper.GetString("mqtt.password"),
ClientID: viper.GetString("mqtt.clientID"),
TimeOut: viper.GetInt64("mqtt.timeOut"),
TopicName: viper.GetString("mqtt.topicName"),
}
return &mqttConfig
}

func init() {
mqttConfig = loadMqttConfig()
}

// 获取mqtt配置文件
func GetMqttConfig() *MqttConfig {
return mqttConfig
}

设置mqtt客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// mqtt客户端
var client mqtt.Client

// 设置mqtt客户端,并进行连接
func SetMqttClient(mqttConfig *config.MqttConfig) {
// 配置信息
clientOptions := mqtt.NewClientOptions().AddBroker(mqttConfig.Broker).SetUsername(mqttConfig.UserName).SetPassword(mqttConfig.Password)
clientOptions.SetClientID(mqttConfig.ClientID)
clientOptions.SetConnectTimeout(time.Duration(mqttConfig.TimeOut) * time.Second)
// 创建客户端
client = mqtt.NewClient(clientOptions)
}

// 连接
func Connect(timeOut int64) {
// 判断客户端连接状态
if token := client.Connect(); token.WaitTimeout(time.Duration(timeOut) * time.Second) && token.Wait() && token.Error() != nil {
panic(token.Error())
}
}

// 订阅主题
func DoSubscribe(topic string, doMessage mqtt.MessageHandler) {
for {
token := client.Subscribe(topic, 1, doMessage)
token.Wait()
}
}

推送数据

1
2
3
4
5
6
7
8
// 推送数据
func DoPublish(client mqtt.Client, topicName string, content interface{}) {
for {
// 往主题推送数据
client.Publish(topicName, 1, false, content)
time.Sleep(time.Duration(3) * time.Second)
}
}

运行

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
// 获取mqtt配置
mqttConfig := config.GetMqttConfig()
// 设置mqtt客户端
MqttUtil.SetMqttClient(mqttConfig)
go MqttUtil.DoPublish(mqttConfig.TopicName, "testPublish...")
MqttUtil.Connect(mqttConfig.TimeOut)
// 订阅主题,从主题中获取数据
// 参数:主题name、回调函数
MqttUtil.DoSubscribe(mqttConfig.TopicName, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload())
})
}