Mosquitto 订阅者收到额外消息
Mosquitto subscriber receiving extra message
我作为一个完全的初学者正在探索蚊子。我有一个 golang 测试程序,我正在使用它来查看它如何管理消息,但是程序的后续 运行s 显示收到了额外的消息。
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"strconv"
"time"
)
var sent int
var received int
func Test() {
sent = 0
received = 0
const TOPIC = "mytopic/test"
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("client1").SetResumeSubs(false).SetAutoReconnect(false).SetCleanSession(true)
client1 := mqtt.NewClient(opts)
if token := client1.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := client1.Subscribe(TOPIC, 2, receiver1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
defer howManySent()
defer howManyReceived()
defer client1.Disconnect(1)
for index:=0;index<3;index++ {
sendMsg := "client1 message " + strconv.Itoa(index)
if token := client1.Publish(TOPIC, 2, true, sendMsg); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
sent++
}
time.Sleep(1 * time.Second)
}
time.Sleep(1*time.Second)
}
func howManyReceived() {
log.Warn("RECEIVED:", received)
}
func howManySent() {
log.Warn("SENT:", sent)
}
func receiver1(client mqtt.Client, msg mqtt.Message) {
received++
msg.Ack()
output := "message id:" + strconv.Itoa(int(msg.MessageID())) + " message = " + string(msg.Payload())
log.Warn(output)
}
第一个 运行 的输出:
WARN[0000] message id:1 message = client1 message 0
WARN[0001] message id:2 message = client1 message 1
WARN[0002] message id:3 message = client1 message 2
WARN[0004] RECEIVED:3
WARN[0004] SENT:3
第二个 运行 的输出:
WARN[0000] message id:4 message = client1 message 2
WARN[0000] message id:5 message = client1 message 0
WARN[0001] message id:6 message = client1 message 1
WARN[0002] message id:7 message = client1 message 2
WARN[0004] RECEIVED:4
WARN[0004] SENT:3
我的配置文件默认是随 mosquitto 安装的,除了我关闭了持久性以查看它是否有帮助:
# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example
pid_file /var/run/mosquitto.pid
persistence false
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
我做错了什么?
Publish定义如下:
Publish(topic string, qos byte, retained bool, payload interface{}) Token
因此,当您调用 token := client1.Publish(TOPIC, 2, true, sendMsg)
时,您正在发布 retained
设置为 true 的消息。 Retain 是随消息传递给代理的标志,表示:
If the RETAIN flag is set to 1, in a PUBLISH Packet sent by a Client to a Server, the Server MUST store the Application Message and its QoS, so that it can be delivered to future subscribers whose subscriptions match its topic name [MQTT-3.3.1-5]. When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6].
设置了 retain
的每条新消息都会替换之前的任何消息,因此当您的第一次执行完成时,消息 2
将被保留。这会在客户端重新连接时发送给客户端(这意味着您看到的结果是预期的)。如果您将调用更改为 token := client1.Publish(TOPIC, 2, false, sendMsg)
,它将按照您的预期运行。
mosquitto docs 对 persistence
标志说如下:
If true, connection, subscription and message data will be written to the disk in mosquitto.db at the location dictated by persistence_location. When mosquitto is restarted, it will reload the information stored in mosquitto.db
所以这只会在您重新启动 Mosquitto 时产生影响;即使此标志设置为 false
.
,会话信息仍将存储在内存中
我作为一个完全的初学者正在探索蚊子。我有一个 golang 测试程序,我正在使用它来查看它如何管理消息,但是程序的后续 运行s 显示收到了额外的消息。
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"strconv"
"time"
)
var sent int
var received int
func Test() {
sent = 0
received = 0
const TOPIC = "mytopic/test"
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("client1").SetResumeSubs(false).SetAutoReconnect(false).SetCleanSession(true)
client1 := mqtt.NewClient(opts)
if token := client1.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := client1.Subscribe(TOPIC, 2, receiver1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
defer howManySent()
defer howManyReceived()
defer client1.Disconnect(1)
for index:=0;index<3;index++ {
sendMsg := "client1 message " + strconv.Itoa(index)
if token := client1.Publish(TOPIC, 2, true, sendMsg); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
sent++
}
time.Sleep(1 * time.Second)
}
time.Sleep(1*time.Second)
}
func howManyReceived() {
log.Warn("RECEIVED:", received)
}
func howManySent() {
log.Warn("SENT:", sent)
}
func receiver1(client mqtt.Client, msg mqtt.Message) {
received++
msg.Ack()
output := "message id:" + strconv.Itoa(int(msg.MessageID())) + " message = " + string(msg.Payload())
log.Warn(output)
}
第一个 运行 的输出:
WARN[0000] message id:1 message = client1 message 0
WARN[0001] message id:2 message = client1 message 1
WARN[0002] message id:3 message = client1 message 2
WARN[0004] RECEIVED:3
WARN[0004] SENT:3
第二个 运行 的输出:
WARN[0000] message id:4 message = client1 message 2
WARN[0000] message id:5 message = client1 message 0
WARN[0001] message id:6 message = client1 message 1
WARN[0002] message id:7 message = client1 message 2
WARN[0004] RECEIVED:4
WARN[0004] SENT:3
我的配置文件默认是随 mosquitto 安装的,除了我关闭了持久性以查看它是否有帮助:
# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example
pid_file /var/run/mosquitto.pid
persistence false
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
我做错了什么?
Publish定义如下:
Publish(topic string, qos byte, retained bool, payload interface{}) Token
因此,当您调用 token := client1.Publish(TOPIC, 2, true, sendMsg)
时,您正在发布 retained
设置为 true 的消息。 Retain 是随消息传递给代理的标志,表示:
If the RETAIN flag is set to 1, in a PUBLISH Packet sent by a Client to a Server, the Server MUST store the Application Message and its QoS, so that it can be delivered to future subscribers whose subscriptions match its topic name [MQTT-3.3.1-5]. When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6].
设置了 retain
的每条新消息都会替换之前的任何消息,因此当您的第一次执行完成时,消息 2
将被保留。这会在客户端重新连接时发送给客户端(这意味着您看到的结果是预期的)。如果您将调用更改为 token := client1.Publish(TOPIC, 2, false, sendMsg)
,它将按照您的预期运行。
mosquitto docs 对 persistence
标志说如下:
If true, connection, subscription and message data will be written to the disk in mosquitto.db at the location dictated by persistence_location. When mosquitto is restarted, it will reload the information stored in mosquitto.db
所以这只会在您重新启动 Mosquitto 时产生影响;即使此标志设置为 false
.