MQTT和时间延迟

MQTT and time delay

我有一个 python 脚本,我正在尝试使用它来处理 paho mqtt 和时间延迟。我已经研究过类似的问题,这些问题讨论了 之类的 paho mqtt。我的python脚本如下:

这是我的脚本:

    import paho.mqtt.client as mqttClient
    import time
    import subprocess
    import os
    
    
    global lastprocessed, Connected
    lastprocessed = None
    Connected = False   #global variable for the state of the connection
    
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to broker")
            global Connected
            Connected = True                #Signal connection
        else:
            print("Connection failed")
    
    
    def on_message(client, userdata, message):
        global lastprocessed
        if message.payload.decode() == "hello":
            lastprocessed = time.time()
    
        if lastprocessed and time.time() - lastprocessed < 20:
            print("good")

broker_address= "192.168.1.111"  #Broker address
port = 1883                         #Broker port
user = "abcde"                    #Connection username
password = "12345"            #Connection password

client = mqttClient.Client("Python")               #create new instance
client.username_pw_set(user, password=password)    #set username and password
client.on_connect= on_connect                      #attach function to callback
client.on_message= on_message                      #attach function to callback
client.connect(broker_address,port,60) #connect
client.subscribe("home/OpenMQTTGateway/433toMQTT") #subscribe
client.loop_forever() #then keep listening forever

我上面的代码发生的事情是,只要在有效负载中收到“hello”,它就会打印“good”,但如果在有效负载中收到任何其他内容,包括“hello”,它会继续打印“good”。它没有考虑 20 秒的时间。我不确定为什么?

我要实现的目标如下:

  1. 开始执行python脚本时,脚本应该打印“bad”。
  2. 这应该会停止,直到在有效负载中收到“hello”。
  3. 收到“hello”后,应打印“good”20 秒,在这 20 秒内应忽略主题中收到的任何其他消息,包括“hello”。
  4. 20 秒后,脚本应继续打印“错误”,但只打印一次,循环继续。

更新问题 24/03/2021:

所以我下面的新脚本一直在打印“已连接到 Broker”,最初它不是,我也没有更改任何东西,但现在不知何故它一直在打印,并且根本没有执行 if 语句。

我知道我的脚本运行良好,但我只需要执行 if 语句和连续循环,即停止连续打印“Connected to Broker”。

import paho.mqtt.client as mqttClient
import time
import subprocess
import os
import json


global lastprocessed, Connected
lastprocessed = None
Connected = False   #global variable for the state of the connection

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to broker")
        global Connected
        Connected = True                #Signal connection
    else:
        print("Connection failed")


x = y = 0

def on_message(client, userdata, message):
    global x, y

    global lastprocessed

    a = str(message.payload)

    data = json.loads(a);

    j = data["contact"]
    print(j)

    if j == False:
        x += 1
        print(x)
    if j == True:
        y += 1
        print(y)
    if x == 1 or x == 0 and y == 1:
        os.system("vcgencmd display_power 1")
    if y == 2:
        x = y = 0
        os.system("vcgencmd display_power 0")

broker_address= "192.168.XXX.XXX"  #Broker address
port = 1883                         #Broker port
user = "XXXX"                    #Connection username
password = "XXXX"            #Connection password

client = mqttClient.Client("Python")               #create new instance
client.username_pw_set(user, password=password)    #set username and password
client.on_connect= on_connect                      #attach function to callback
client.on_message= on_message                      #attach function to callback
client.connect(broker_address,port,60) #connect
client.subscribe("zigbee2mqtt/Xiaomi Door Sensor 2") #subscribe
client.loop_forever() #then keep listening forever

我使用 loop_start()loop_stop() 而不是 loop_forever() 然后在 startstop 之间我可以创建自己的循环来检查消息和打印文本。

我使用变量 state 来控制代码是在第一个 hello (state = "start") 之前还是在 hello 之前,现在它必须检查时间并重复文本“好”(state = "hello")或者在 hello 之后有 20 秒并且没有打印任何内容(state = "other")

on_message 中,当我收到消息 hello 并且旧状态与 hello 不同时,我只将状态更改为 hello

import paho.mqtt.client as mqttClient
import time

# --- functions ---

def on_connect(client, userdata, flags, rc):
    #global state
    global connected

    if rc == 0:
        print("Connected to broker")
        connected = True                
    else:
        print("Connection failed")

def on_message(client, userdata, message):
    global state
    global last_processed
    
    if message.payload.decode() == "hello":
        if state != 'hello':
            state = 'hello'
            last_processed = time.time()

    # OR
    
    #if state != 'hello':
    #    if message.payload.decode() == "hello":
    #        state = 'hello'
    #        last_processed = time.time()

# --- main ---

broker_address = "192.168.1.111"  # broker address
port = 1883                       # broker port
user = "abcde"                    # connection username
password = "12345"                # connection password

# ---

client = mqttClient.Client("Python")               # create new instance
client.username_pw_set(user, password=password)    # set username and password
client.on_connect= on_connect                      # attach function to callback
client.on_message= on_message                      # attach function to callback
client.connect(broker_address, port, 60)           # connect
client.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe

# --- main loop ---

last_processed = None
connected = False      # signal connection
state = 'start' # 'start', 'hello', 'other', 'not connected'   

# ---

client.loop_start()

try:
    while True:
        if not connected:
            print('not connected')
            
        if state == 'start':
            print('bad')
            
        if state == 'hello':
            if last_processed and time.time() - last_processed < 20:
                print("good", time.time() - last_processed, end='\r') # '\r` to write it in the same line.
            else:
                print('bad')
                state = 'other'
                last_processed = None
        
        if state == 'other':
            pass
    
        time.sleep(1.0) # to slow down example
        
except KeyboardInterrupt:
    print('KeyboardInterrupt')
    
client.loop_stop()

编辑:

如果你只需要一次badhello那么你可以在循环之前先打印bad,当你收到消息时打印hello然后time.sleep(20) 在打印 bad 并更改状态之前。

import paho.mqtt.client as mqttClient
import time

# --- functions ---

def on_connect(client, userdata, flags, rc):
    #global state
    global connected

    if rc == 0:
        print("Connected to broker")
        connected = True                
    else:
        print("Connection failed")

def on_message(client, userdata, message):
    global state

    message = message.payload.decode()
    
    if state != 'hello':
        if message == 'hello':
            state = 'hello'
            print('good') # once when start `hello`
        else:
            print('msg:', message)        

# --- main ---

broker_address = "192.168.1.111"  # broker address
port = 1883                       # broker port
user = "abcde"                    # connection username
password = "12345"                # connection password

# ---

client = mqttClient.Client("Python")               # create new instance
client.username_pw_set(user, password=password)    # set username and password
client.on_connect= on_connect                      # attach function to callback
client.on_message= on_message                      # attach function to callback
client.connect(broker_address, port, 60)           # connect
client.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe

# --- main loop ---

connected = False      # signal connection
state = 'other' # 'hello'

# ---

client.loop_start()

print('bad') # once at start

try:
    while True:
        
        if state == 'hello':
            time.sleep(20)
            print('bad')  # once when end `hello`
            state = 'other'
    
        time.sleep(1.0) # to slow down example
        
except KeyboardInterrupt:
    print('KeyboardInterrupt')
    
client.loop_stop()

使用 threading.Timer 而不是 time.sleep() 会很有用,因为 sleep() 会阻止循环,如果你想做更多的事情就没那么有用了。

import paho.mqtt.client as mqttClient
import threading

# --- functions ---

def on_connect(client, userdata, flags, rc):
    #global state
    global connected

    if rc == 0:
        print("Connected to broker")
        connected = True                
    else:
        print("Connection failed")

def on_message(client, userdata, message):
    global state

    message = message.payload.decode()
    
    if state != 'hello':
        if message == 'hello':
            state = 'hello'
            print('good') # once when start `hello`
            threading.Timer(20, end_hello).start()
        else:
            print('msg:', message)
        
def end_hello():
    global state
    
    print('bad')  # once when end `hello`
    state = 'other'

# --- main ---

broker_address = "192.168.1.111"  # broker address
port = 1883                       # broker port
user = "abcde"                    # connection username
password = "12345"                # connection password

# ---

client = mqttClient.Client("Python")               # create new instance
client.username_pw_set(user, password=password)    # set username and password
client.on_connect= on_connect                      # attach function to callback
client.on_message= on_message                      # attach function to callback
client.connect(broker_address, port, 60)           # connect
client.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe

# --- main loop ---

connected = False      # signal connection
state = 'other' # 'hello'

# ---

print('bad') # once at start
client.loop_forever()

最终你仍然可以在循环中检查时间

import paho.mqtt.client as mqttClient
import time

# --- functions ---

def on_connect(client, userdata, flags, rc):
    #global state
    global connected

    if rc == 0:
        print("Connected to broker")
        connected = True                
    else:
        print("Connection failed")

def on_message(client, userdata, message):
    global state
    global last_processed
    
    message = message.payload.decode()
    
    if state != 'hello':
        if message == 'hello':
            state = 'hello'
            last_processed = time.time()
            print('good') # once when start `hello`
        else:
            print('msg:', message)        

# --- main ---

broker_address = "192.168.1.111"  # broker address
port = 1883                       # broker port
user = "abcde"                    # connection username
password = "12345"                # connection password

# ---

client = mqttClient.Client("Python")               # create new instance
client.username_pw_set(user, password=password)    # set username and password
client.on_connect= on_connect                      # attach function to callback
client.on_message= on_message                      # attach function to callback
client.connect(broker_address, port, 60)           # connect
client.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe

# --- main loop ---

last_processed = None
connected = False      # signal connection
state = 'other' # 'hello'

# ---

client.loop_start()

print('bad') # once at start

try:
    while True:
        
        if state == 'hello':
            if time.time() >= last_processed + 20:
                print('bad')  # once when end `hello`
                state = 'other'
    
        time.sleep(1.0) # to slow down example
        
except KeyboardInterrupt:
    print('KeyboardInterrupt')
    
client.loop_stop()