蚊子的持久性不一致

inconsistent persistence in mosquitto

我在 mosquitto 上发现消息传递与消息持久性和 qos=2 不一致。我做错了什么吗?

我有一个简单的测试应用程序,它使用 clientId="receive-client" 注册消费主题,但立即断开连接。然后它作为 clientId="send-client" 连接并发布 10 条消息,"message #1" ... "message #10"。然后断开连接,等待五秒钟,然后连接以再次使用“receive-client”进行消费,同时打印和计算收到的消息。

结果不一致。有时我收到 6 条消息,有时 8 条。典型的输出是这样的:

WARN[0005] GOT A MESSAGE:message #1                     
WARN[0005] GOT A MESSAGE:message #2                     
WARN[0005] GOT A MESSAGE:message #3                     
WARN[0005] GOT A MESSAGE:message #4                     
WARN[0005] GOT A MESSAGE:message #5                     
WARN[0005] GOT A MESSAGE:message #6                     
WARN[0005] GOT A MESSAGE:message #7                     
WARN[0005] GOT A MESSAGE:message #8                     
WARN[0305] PAUSE                                        
WARN[0605] received message count=8                     

我的版本信息是 1.4.15。我的 mosquitto.conf 是:

pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

allow_anonymous false
password_file /etc/mosquitto/passwd

log_dest file /var/log/mosquitto/mosquitto.log

最初 /var/lib/mosquitto/mosquitto.db 直到几次迭代 运行 才出现。我的测试应用程序在这里:

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    log "github.com/sirupsen/logrus"
    "time"
)

var receivedMsg int

func Persist() {
    const TOPIC = "test"
    const URL = "tcp://localhost:1883"
    const USERNAME = "myuser"
    const PASSWORD = "mypassword"

    defer printReceived()

    options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
    options.SetCleanSession(false)
    options.SetConnectRetry(true)
    options.SetConnectRetryInterval(10 * time.Millisecond)

    // register the receive client with broker / TOPIC
    // to be sure the broker knows it needs to save our messages
    // to deliver at a later time
    options.SetClientID("receive-client")
    client := mqtt.NewClient(options)
    token := client.Connect()
    token.Wait()
    if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    client.Disconnect(0)

    // connect with send client and send 10 messages
    options.SetClientID("send-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    client.Publish(TOPIC, 2, false, "message #1")
    client.Publish(TOPIC, 2, false, "message #2")
    client.Publish(TOPIC, 2, false, "message #3")
    client.Publish(TOPIC, 2, false, "message #4")
    client.Publish(TOPIC, 2, false, "message #5")
    client.Publish(TOPIC, 2, false, "message #6")
    client.Publish(TOPIC, 2, false, "message #7")
    client.Publish(TOPIC, 2, false, "message #8")
    client.Publish(TOPIC, 2, false, "message #9")
    client.Publish(TOPIC, 2, false, "message #10")
    client.Disconnect(4)
    time.Sleep(5* time.Second)

    // subscribe again and try to retrieve the messages we missed
    options.SetClientID("receive-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    time.Sleep(300 * time.Second)
    log.Warn("PAUSE")
    time.Sleep(300 * time.Second)
}

func consume1(client mqtt.Client, msg mqtt.Message) {
    receivedMsg++
    log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}

func consume2(client mqtt.Client, msg mqtt.Message) {
    receivedMsg++
    log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}

func printReceived() {
    log.Warn("received message count=", receivedMsg)
}

以 QOS 2 发布是一个多步骤过程,因此最可能的原因是您在所有消息实际完成发布到代理之前断开发布客户端。 您可能应该在循环中发布并使用从对 client.publish() 的调用返回的令牌等到它完成后再断开客户端。

例如如示例所示:

//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
  text := fmt.Sprintf("this is msg #%d!", i)
  token := c.Publish("go-mqtt/sample", 0, false, text)
  token.Wait()
}