节点红色到 asyncio-mqtt 调试

node red to asyncio-mqtt debug

我正在同一 Ubuntu 实例上试验 Node Red 和 运行 mosquito 代理。客户端 MQTT 设备我 运行 asyncio-mqtt 在单独的 Ubuntu 设备上。

在红色节点中,只有 2 个 mqtt 块用于输入和输出:

31 的值来自 asyncio-mqtt advanced_example python script(向下滚动到高级用途)在这个 repo 中发布数据到主题 [=14 的消息总线=] 这似乎很好用。

但是我很难尝试将消息从节点 red 发布到 Python 脚本 运行 advanced_example.

在蓝色箭头所在的节点红色屏幕截图中,我可以使用注入节点将字符串 Testing123 发布到有效的主题 floors/get/temps,在我的 Python 脚本中 运行高级示例我有一个await client.subscribe("floors/#")。我可以在来自 Node Red 的 Python 脚本的 cmd 提示符中看到 Testing123 字符串。

我被卡住的地方是 我似乎无法创建一些布尔语句来标记特定消息。例如,这是合法的吗?我正在尝试标记已发布的 Node Red 消息 Testing123 但出于某种原因 if payload_ == "Testing123" 永远不会为真。 type(payload_) 也是字符串。

async def log_messages(messages, template):
    async for message in messages:
        payload_ = template.format(message.payload.decode())
        print("log_messages def hit ",payload_)
        if payload_ == "Testing123":
            print("YES payload_ == Testing123, DO SOMETHING")
        else:
            print("payload_ not == Testing123")

非常感谢任何提示,我觉得我缺少某种回调或“标记”某些传入 MQTT 消息的东西。

这是全部 advanced_example 抱歉,代码很多:

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
import configs
import json



broker_url = configs.login_info["broker_url"]
username = configs.login_info["username"]
password = configs.login_info["password"]


async def advanced_example():
    # We  context managers. Let's create a stack to help
    # us manage them.
    async with AsyncExitStack() as stack:
        # Keep track of the asyncio tasks that we create, so that
        # we can cancel them on exit
        tasks = set()
        stack.push_async_callback(cancel_tasks, tasks)

        # Connect to the MQTT broker
        client = Client(broker_url,
                username=username,
                password=password)
        await stack.enter_async_context(client)

        # You can create any number of topic filters
        topic_filters = (
            "floors/+/humidity",
            "floors/rooftop/#",
            "floors/get/temps"
            #  Try to add more filters!
        )
        for topic_filter in topic_filters:
            # Log all messages that matches the filter
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            template = f'[topic_filter="{topic_filter}"] {{}}'
            task = asyncio.create_task(log_messages(messages, template))
            tasks.add(task)

        # Messages that doesn't match a filter will get logged here
        messages = await stack.enter_async_context(client.unfiltered_messages())
        task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
        tasks.add(task)

        # Subscribe to topic(s)
        #  Note that we subscribe *after* starting the message
        # loggers. Otherwise, we may miss retained messages.
        await client.subscribe("floors/#")

        # Publish a random value to each of these topics
        topics = (
            "floors/basement/humidity",
            "floors/rooftop/humidity",
            "floors/rooftop/illuminance",
            #  Try to add more topics!
        )
        task = asyncio.create_task(post_to_topics(client, topics))
        tasks.add(task)

        # Wait for everything to complete (or fail due to, e.g., network
        # errors)
        await asyncio.gather(*tasks)

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
            message = randrange(100)
            print(f'[topic="{topic}"] Publishing message={message}')
            await client.publish(topic, message, qos=1)
            await asyncio.sleep(60)



async def log_messages(messages, template):
    async for message in messages:
        payload_ = template.format(message.payload.decode())
        print("log_messages def hit ",payload_)
        if payload_ == "Testing123":
            print("YES payload_ == Testing123, DO SOMETHING")
        else:
            print("payload_ not == Testing123")
            
            


async def cancel_tasks(tasks):
    for task in tasks:
        if task.done():
            continue
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

async def main():
    # Run the advanced_example indefinitely. Reconnect automatically
    # if the connection is lost.
    reconnect_interval = 3  # [seconds]
    while True:
        try:
            await advanced_example()
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)


asyncio.run(main())

根据评论将 if payload_ == "Testing123": 更改为 if message.payload.decode() == "Testing123"

您的代码不起作用,因为 payload_ = template.format(message.payload.decode()) 将模板应用于消息有效负载 - 如果消息是 Testing123,则 payload_ 将被设置为类似 [topic_filter="floors/get/temps"] Testing123 的内容(这将使您的平等测试失败)。