paho-mqtt 不会给我排队的消息
paho-mqtt won't give me the queued messages
使用 paho-mqtt 并尝试让它接收排队的消息。我使用的代理是 emqx 4.2.2 这是我的脚本:
from paho.mqtt.client import Client, MQTTv5
def on_connect(mqttc, obj, flags, rc, other):
print(" Session present: " + str(flags['session present']))
print(" Connection result: " + str(rc))
mqttc.subscribe([
('/message/1', 1)
])
def on_message(*args, **kwargs):
print("received a message")
client = Client(
client_id='test-client-id',
protocol=MQTTv5,
)
client.username_pw_set(
username="test-user-2",
password="test"
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(
host='localhost',
port=1883,
keepalive=60,
clean_start=False,
)
client.loop_forever()
我现在去给经纪人发布消息:
mosquitto_pub -u test-user-2 -P test -t '/message/1' -m 'this is a message' -q 1 -V mqttv5
当客户端连接到代理时,它确实收到了消息,但考虑到我
使用 QoS 1 订阅并使用 QoS 1 发布消息 我希望如果我
断开我的客户端与代理的连接,然后向该主题发布更多 QoS 1 消息,然后使用相同的固定 client_id 将我的客户端重新连接到代理,然后我的客户端将收到在我的客户端排队时已排队的消息离开。好吧,这并没有发生,用 mosquitto_sub 和 -c 标志模拟相同的功能,一切都按预期工作,这让我问自己……paho-mqtt 有问题吗?我做错了什么吗?
QOS 仅在客户端已订阅时才有效...如果您想在订阅之前接收消息,则需要使用 Retain 标志集发布消息...然后您只会收到最后一条消息发送。如果你真的需要获取多条消息,那么你需要使用像 AMQ 这样的东西,而不是 MQTT。
它应该是 clean_session
而不是 clean_start
并且它需要传递给 Client
构造函数而不是 connect()
函数:
...
client = Client(
client_id='test-client-id',
protocol=MQTTv5,
clean_session=False
)
client.username_pw_set(
username="test-user-2",
password="test"
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(
host='localhost',
port=1883,
keepalive=60
)
...
在MQTT v5中,clean start仅表示是否在启动时删除session。要控制断开连接后会话持续多长时间,您需要设置会话过期时间间隔:
import paho.mqtt.properties as properties
...
connect_properties = properties.Properties(properties.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600
client.connect("localhost", 1883, 60, properties=connect_properties)
使用 paho-mqtt 并尝试让它接收排队的消息。我使用的代理是 emqx 4.2.2 这是我的脚本:
from paho.mqtt.client import Client, MQTTv5
def on_connect(mqttc, obj, flags, rc, other):
print(" Session present: " + str(flags['session present']))
print(" Connection result: " + str(rc))
mqttc.subscribe([
('/message/1', 1)
])
def on_message(*args, **kwargs):
print("received a message")
client = Client(
client_id='test-client-id',
protocol=MQTTv5,
)
client.username_pw_set(
username="test-user-2",
password="test"
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(
host='localhost',
port=1883,
keepalive=60,
clean_start=False,
)
client.loop_forever()
我现在去给经纪人发布消息:
mosquitto_pub -u test-user-2 -P test -t '/message/1' -m 'this is a message' -q 1 -V mqttv5
当客户端连接到代理时,它确实收到了消息,但考虑到我 使用 QoS 1 订阅并使用 QoS 1 发布消息 我希望如果我 断开我的客户端与代理的连接,然后向该主题发布更多 QoS 1 消息,然后使用相同的固定 client_id 将我的客户端重新连接到代理,然后我的客户端将收到在我的客户端排队时已排队的消息离开。好吧,这并没有发生,用 mosquitto_sub 和 -c 标志模拟相同的功能,一切都按预期工作,这让我问自己……paho-mqtt 有问题吗?我做错了什么吗?
QOS 仅在客户端已订阅时才有效...如果您想在订阅之前接收消息,则需要使用 Retain 标志集发布消息...然后您只会收到最后一条消息发送。如果你真的需要获取多条消息,那么你需要使用像 AMQ 这样的东西,而不是 MQTT。
它应该是 clean_session
而不是 clean_start
并且它需要传递给 Client
构造函数而不是 connect()
函数:
...
client = Client(
client_id='test-client-id',
protocol=MQTTv5,
clean_session=False
)
client.username_pw_set(
username="test-user-2",
password="test"
)
client.on_connect = on_connect
client.on_message = on_message
client.connect(
host='localhost',
port=1883,
keepalive=60
)
...
在MQTT v5中,clean start仅表示是否在启动时删除session。要控制断开连接后会话持续多长时间,您需要设置会话过期时间间隔:
import paho.mqtt.properties as properties
...
connect_properties = properties.Properties(properties.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600
client.connect("localhost", 1883, 60, properties=connect_properties)