节点红色到 asyncio-mqtt 调试
node red to asyncio-mqtt debug
我正在同一 Ubuntu 实例上试验 Node Red 和 运行 mosquito 代理。客户端 MQTT 设备我 运行 asyncio-mqtt 在单独的 Ubuntu 设备上。
在红色节点中,只有 2 个 mqtt 块用于输入和输出:
31
的值来自 asyncio-mqtt advanced_example
python script(向下滚动到高级用途)在这个 repo 中发布数据到主题 [=14 的消息总线=] 这似乎很好用。
但是我很难尝试将消息从节点 red 发布到 Python 脚本 运行 advanced_example
.
在蓝色箭头所在的节点红色屏幕截图中,我可以使用注入节点将字符串 Testing123
发布到有效的主题 floors/get/temps
,在我的 Python 脚本中 运行高级示例我有一个await client.subscribe("floors/#")
。我可以在来自 Node Red 的 Python 脚本的 cmd 提示符中看到 Testing123
字符串。
我被卡住的地方是 我似乎无法创建一些布尔语句来标记特定消息。例如,这是合法的吗?我正在尝试标记已发布的 Node Red 消息 Testing123 但出于某种原因 if payload_ == "Testing123"
永远不会为真。 type(payload_)
也是字符串。
async def log_messages(messages, template):
async for message in messages:
payload_ = template.format(message.payload.decode())
print("log_messages def hit ",payload_)
if payload_ == "Testing123":
print("YES payload_ == Testing123, DO SOMETHING")
else:
print("payload_ not == Testing123")
非常感谢任何提示,我觉得我缺少某种回调或“标记”某些传入 MQTT 消息的东西。
这是全部 advanced_example
抱歉,代码很多:
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
import configs
import json
broker_url = configs.login_info["broker_url"]
username = configs.login_info["username"]
password = configs.login_info["password"]
async def advanced_example():
# We context managers. Let's create a stack to help
# us manage them.
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client(broker_url,
username=username,
password=password)
await stack.enter_async_context(client)
# You can create any number of topic filters
topic_filters = (
"floors/+/humidity",
"floors/rooftop/#",
"floors/get/temps"
# Try to add more filters!
)
for topic_filter in topic_filters:
# Log all messages that matches the filter
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
template = f'[topic_filter="{topic_filter}"] {{}}'
task = asyncio.create_task(log_messages(messages, template))
tasks.add(task)
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
tasks.add(task)
# Subscribe to topic(s)
# Note that we subscribe *after* starting the message
# loggers. Otherwise, we may miss retained messages.
await client.subscribe("floors/#")
# Publish a random value to each of these topics
topics = (
"floors/basement/humidity",
"floors/rooftop/humidity",
"floors/rooftop/illuminance",
# Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def post_to_topics(client, topics):
while True:
for topic in topics:
message = randrange(100)
print(f'[topic="{topic}"] Publishing message={message}')
await client.publish(topic, message, qos=1)
await asyncio.sleep(60)
async def log_messages(messages, template):
async for message in messages:
payload_ = template.format(message.payload.decode())
print("log_messages def hit ",payload_)
if payload_ == "Testing123":
print("YES payload_ == Testing123, DO SOMETHING")
else:
print("payload_ not == Testing123")
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
while True:
try:
await advanced_example()
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
根据评论将 if payload_ == "Testing123":
更改为 if message.payload.decode() == "Testing123"
。
您的代码不起作用,因为 payload_ = template.format(message.payload.decode())
将模板应用于消息有效负载 - 如果消息是 Testing123
,则 payload_
将被设置为类似 [topic_filter="floors/get/temps"] Testing123
的内容(这将使您的平等测试失败)。
我正在同一 Ubuntu 实例上试验 Node Red 和 运行 mosquito 代理。客户端 MQTT 设备我 运行 asyncio-mqtt 在单独的 Ubuntu 设备上。
在红色节点中,只有 2 个 mqtt 块用于输入和输出:
31
的值来自 asyncio-mqtt advanced_example
python script(向下滚动到高级用途)在这个 repo 中发布数据到主题 [=14 的消息总线=] 这似乎很好用。
但是我很难尝试将消息从节点 red 发布到 Python 脚本 运行 advanced_example
.
在蓝色箭头所在的节点红色屏幕截图中,我可以使用注入节点将字符串 Testing123
发布到有效的主题 floors/get/temps
,在我的 Python 脚本中 运行高级示例我有一个await client.subscribe("floors/#")
。我可以在来自 Node Red 的 Python 脚本的 cmd 提示符中看到 Testing123
字符串。
我被卡住的地方是 我似乎无法创建一些布尔语句来标记特定消息。例如,这是合法的吗?我正在尝试标记已发布的 Node Red 消息 Testing123 但出于某种原因 if payload_ == "Testing123"
永远不会为真。 type(payload_)
也是字符串。
async def log_messages(messages, template):
async for message in messages:
payload_ = template.format(message.payload.decode())
print("log_messages def hit ",payload_)
if payload_ == "Testing123":
print("YES payload_ == Testing123, DO SOMETHING")
else:
print("payload_ not == Testing123")
非常感谢任何提示,我觉得我缺少某种回调或“标记”某些传入 MQTT 消息的东西。
这是全部 advanced_example
抱歉,代码很多:
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
import configs
import json
broker_url = configs.login_info["broker_url"]
username = configs.login_info["username"]
password = configs.login_info["password"]
async def advanced_example():
# We context managers. Let's create a stack to help
# us manage them.
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client(broker_url,
username=username,
password=password)
await stack.enter_async_context(client)
# You can create any number of topic filters
topic_filters = (
"floors/+/humidity",
"floors/rooftop/#",
"floors/get/temps"
# Try to add more filters!
)
for topic_filter in topic_filters:
# Log all messages that matches the filter
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
template = f'[topic_filter="{topic_filter}"] {{}}'
task = asyncio.create_task(log_messages(messages, template))
tasks.add(task)
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
tasks.add(task)
# Subscribe to topic(s)
# Note that we subscribe *after* starting the message
# loggers. Otherwise, we may miss retained messages.
await client.subscribe("floors/#")
# Publish a random value to each of these topics
topics = (
"floors/basement/humidity",
"floors/rooftop/humidity",
"floors/rooftop/illuminance",
# Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def post_to_topics(client, topics):
while True:
for topic in topics:
message = randrange(100)
print(f'[topic="{topic}"] Publishing message={message}')
await client.publish(topic, message, qos=1)
await asyncio.sleep(60)
async def log_messages(messages, template):
async for message in messages:
payload_ = template.format(message.payload.decode())
print("log_messages def hit ",payload_)
if payload_ == "Testing123":
print("YES payload_ == Testing123, DO SOMETHING")
else:
print("payload_ not == Testing123")
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
while True:
try:
await advanced_example()
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
根据评论将 if payload_ == "Testing123":
更改为 if message.payload.decode() == "Testing123"
。
您的代码不起作用,因为 payload_ = template.format(message.payload.decode())
将模板应用于消息有效负载 - 如果消息是 Testing123
,则 payload_
将被设置为类似 [topic_filter="floors/get/temps"] Testing123
的内容(这将使您的平等测试失败)。