在 on_message 中发布,Paho Mqtt 客户端不工作

Publish in on_message with Paho Mqtt Client not working

main 方法中的发布调用不会向代理发送消息,而是 returns (0,2)。 on_publish 不会被调用。 我不确定我应该如何查看错误所在。我尝试了 qos=2 和 运行 主要方法 async 但它没有修复它。基础连接工作,因为 on_message 被调用。 mqtt-server 也适用于其他项目。 有人有什么想法吗?

import paho.mqtt.client as mqtt
import subprocess
import shutil
import os
import glob
import logging
from datetime import datetime
import RPi.GPIO as GPIO
import time
from multiprocessing import Pool
GPIO.setmode(GPIO.BCM)

direction = 19
pwm = 26
GPIO.setup(direction, GPIO.OUT)
GPIO.setup(pwm, GPIO.OUT)

pwm = GPIO.PWM(pwm,19000)
speed = 0
pwm.start(speed)
running = False

client = mqtt.Client()
#First: pip3 install paho-mqtt
print("Starte Listener")

def start():
    GPIO.output(direction, GPIO.LOW)
    for dc in range(10, 40, 1):
        speed = dc 
        pwm.ChangeDutyCycle(speed)
        time.sleep(0.25)

def end():
    GPIO.output(direction, GPIO.LOW)
    for dc in list(reversed(range(0,40,1))):
        speed = dc 
        pwm.ChangeDutyCycle(speed)
        time.sleep(0.5)


def main(mqttClient):
    print("Started Spinning")
    running = True
    start()
    print(mqttClient.publish("scanner","shoot"))
    print("Waiting 124 seconds")
    time.sleep(124)
    print("ending spinning")
    end()
    print("finished spinning")
    running = False 

def on_connect2(client, userdata, flags, rc):
    client.subscribe("scanner",2)
    print("Connected "+str(rc))
    

# The callback for when a PUBLISH message is received from the server.
def on_message(mqttClient, userdata, msg):
    print("Recived message: "+str(msg.payload,'UTF-8'))
    if(str(msg.payload,'UTF-8') == "spin" and running == False):
        main(mqttClient)

def disconnected():
    print("Disconneted")

def on_publish(self, client, userdata, mid):
    print("onPublish")
    print(client,userdata,mid)

logger = logging.getLogger(__name__)
client.enable_logger(logger)

client.on_connect = on_connect2
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = disconnected
client.username_pw_set("....","....")
client.connect("gx1", 1883,60)
client.loop_forever()

作为快速修复,我创建了一个新客户端:

def main(mqttClient):
    print("Started Spinning")
    running = True
    start()
    mq = mqtt.Client("shot_idicator")
    mq.username_pw_set("...","...")
    mq.connect("gx1", 1883,60)
    mq.publish("scanner","shoot")
    mq.disconnect()
    mq = None
    print("Waiting 124 seconds")
    time.sleep(124)
    print("ending spinning")
    end()
    print("finished spinning")
    running = False 

现在可以了,但我认为不应该这样做。

这是因为您阻塞了 MQTT 客户端线程。

当您调用 client.loop_forever() 时,它会接管进程主线程并使用它来处理所有 MQTT 通信。当收到一条新消息时,MQTT 客户端线程会从网络堆栈中拾取它,并将其转换为消息对象,然后传递给 on_message() 回调。此函数 运行s 在客户端线程上。

当您调用 client.publish() 时,这将执行以下两件事之一

  1. 如果消息是 QOS 0 且小于网络 MTU,它将发布消息。
  2. 如果消息是 QOS 1 或 2 或大于网络 MTU,那么它会将消息排队等待客户端线程处理。

代码中的问题*是您将 main() 中的 return 阻塞了 124 秒,这反过来又阻塞了 on_message() 函数的 return所以客户端线程无法发布您的消息。

如果你想在 on_message()(或任何回调函数)中做一些需要很长时间或阻塞的事情,你应该启动一个单独的线程来 运行 它们。

*理论上你的消息看起来应该属于上面列出的情况 1,但可能还有其他因素导致它排队)