等待消息时发送消息

Send messages while waiting for messages

我正在使用 mqtt 协议将传感器数据消息发送到 mosquitto 代理。我想做的是每 t 秒发送一次传感器数据,但是如果我收到一条消息,它会在 parallel.I 中进行处理使用 time.sleep() 但我认为这也会延迟 "on_message" 函数。我正在使用 paho-mqtt 和 python 2.7。关于如何完成这样的事情有什么想法吗?

1 号客户端。(发送传感器数据)

from mysignals import mysignals
import paho.mqtt.client as mqtt
import time  

def on_connect(client, userdata, rc):
    mqttc.subscribe(topic='/+/mysignals/status', qos=0)
    mqttc.subscribe(topic='/+/mysignals/add_sensor',qos=0)
    mqttc.subscribe(topic='/+/mysignals/remove_sensor',qos=0)

def on_message(client,userdata,message):
    print 'received data'
    base_topic = '/mysignals'
    member_id = message.topic.split('/')[1]
    status_topic = '/mysignals/status'
    add_sensor_topic = '/mysignals/add_sensor'
    remove_sensor_topic = '/mysignals/remove_sensor'
    log_topic = '/log'
    if status_topic in message.topic:
        action = mysignals_test.change_status(int(member_id),int(message.payload))
        mqttc.publish(topic='/'+member_id+status_topic+log_topic+'/',payload=action,qos=0)
    elif add_sensor_topic in message.topic:
        action = mysignals_test.add_sensor(message.playload,int(member_id))
        mqttc.publish(topic='/'+member_id+add_sensor_topic+log_topic+'/',payload=action,qos=0)
    elif remove_sensor_topic in message.topic:
        action = mysignals_test.remove_sensor(message.payload,int(member_id))
        mqttc.publish(topic='/'+member_id+remove_sensor_topic+log_topic+'/',payload=action,qos=0)
    else:
        mqttc.publish(topic='/'+member_id+base_topic+log_topic+'/',payload='Wrong Action.',qos=0)

mysignals_test = mysignals(email='blablabla',password='blabla')
mysignals_test.add_sensor('temp',150)
mysignals_test.change_status(150,1)
mqttc = mqtt.Client(client_id='mysignals')
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect('broker ip')
mqttc.loop_start()

while True:
    for member in mysignals_test.members:
        if member.status == 1:
            live_data = mysignals_test.live(member.member_id)
            for data in live_data:
                topic = '/'+str(data.member_id)+'/mysignals/'+str(data.sensor_id)+'/'
                qos = 0
                retain = False
                if 'raw' in data.sensor_id:
                    payload = data.values
                else:
                    payload = data.value
                mqttc.publish(topic=topic,payload=payload,qos=qos,retain=retain)
    print 'sent data'
    time.sleep(55)

客户端 2(从客户端 1.Only 接收数据 subscriber.Also 也向 client1.only 订阅者发送测试消息。)

import paho.mqtt.client as mqtt

def on_connect(client, userdata, rc):
    mqttc.subscribe(topic='/+/mysignals/+/', qos=0)
    mqttc.subscribe(topic='/+/mysignals/status/+/', qos=0)
def on_disconnect(client,userdata,rc):
    pass
def on_message(client, userdata, message):
    if str(message.payload) == '':
        print 'empty message'
    else:
        print 'Received message :' + str(message.payload) + ', on topic: '+ message.topic + ', with QoS: ' + str(message.qos) 
        mqttc.publish(topic='/154/mysignals/status',payload=0,qos=0)

mqttc = mqtt.Client(client_id='P1')
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
mqttc.connect('broker ip')
mqttc.loop_start()

仅作记录 "mysignals" 对象是我创建的,它不存在 there.The 上面代码的问题是,当客户端 2 收到传感器数据时,它挂起并发送测试返回代理的消息 indefinetely.The 客户端 2 在收到传感器数据时发送的测试消息应由客户端 1 读取,并且客户端 1 应相应地操作 "mysignals" 对象。

客户端 2 输出:

Received message :25.4, on topic: /150/mysignals/temp/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0
Received message :Success, on topic: /154/mysignals/status/log/, with

客户端 1 输出:

Login Successfull.
sent data
sent data
received data
received data
received data
received data
received data
received data
received data
received data

PS:我不包括 "mysignals.py" 因为大约有 200 行。

我对 Paho MQTT 库中回调函数的理解是后台循环(由 loop_start 启动)会中断主线程,所以我不会担心使用 time.sleep()。因此,如果您主要关心的是不延迟 on_message 回调,那应该不是问题。我经常在 Python 脚本中使用 sleep 和 MQTT 回调。

当然回调可能会稍微延迟传感器发送数据,但你真的需要数据以绝对精确的时间发送吗?大多数数据库——例如 RRD——可以很容易地适应稍有偏差的更新时间。或者,如果您的 on_message 回调处理时间长得令人无法接受,请考虑将 MQTT 消息的有效负载从回调函数中传递出去,并在脚本的其他位置进行处理。

如果您确实需要传感器更新的瞬间精度,请考虑将功能分成两个脚本(或线程),每个脚本(或线程)仅用于一个目的(发送或接收)。

好的,我设法得到了预期的结果。我不得不在状态子主题上创建 2 个子主题。一个客户应该写在状态子主题的日志子主题上,一个客户应该写在状态子主题的值子主题上.我还删除了 subscribes.I 上的 qos 参数还添加了 time.sleep() 并且我很少错过一条消息。只有在非常短的时间间隔内收到 2 条消息时。最大的问题是我没有了解主题和副主题的实际运作方式。