如何将多个发布者发布给一个订阅者?
How to publish with many publishers to one subscriber?
我需要构建一个包含多个发布者、一个经纪人和一个订阅者的网络。
问题:
我有一个带有一对一选项的工作代码(一个酒吧 - 一个子)。但是,当我 运行 更多发布者时,订阅者仅从最后连接的发布者接收消息,我不知道为什么会这样。
更多信息:Broker (mosquitto) 在 docker 容器中工作,每个发布者都是一个单独的脚本,目标是 运行 多个 docker 容器,一个发布者在每个容器,但现在我需要解决通信问题。
有没有人对如何调试或解决这个问题有任何线索或想法?
这是发布者脚本:
import time
import paho.mqtt.client as mqtt
import datetime
import random
import json
import logging
from multiprocessing import Process
CLEAN_SESSION = False
def on_connect(client, userdata, flags, rc):
logging.info(f"New connection {client}, {rc}")
def sensor(client_id):
localhost = '172.17.0.2'
port = 1883
timeout = 60
topic = "/mia/sensor"
client_id = f"sensor_{client_id}"
def check_sensor():
time.sleep(1)
rand = random.randint(0, 10)
if rand > 5:
current_time = datetime.datetime.now()
current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
return {"time": current_time, "signal": 1, "id": client_id}
else:
return 0
client = mqtt.Client(client_id, clean_session=CLEAN_SESSION)
client.on_connect = on_connect
client.connect(localhost, port, timeout)
while True:
check_info = check_sensor()
if check_info:
message_payload = json.dumps(check_info)
logging.info(message_payload)
client.publish(topic, message_payload, qos=2)
client.loop()
client.disconnect()
if __name__ == "__main__":
p = Process(target=sensor, args=(1,))
p.start()
print("new publisher created!")
这是订阅者脚本:
import paho.mqtt.client as mqtt
import paho.mqtt.subscribe as sub
import json
import logging
localhost = '172.17.0.2'
port = 1883
timeout = 60
topic = "/mia/sensor"
def on_connect(client, userdata, flags, rc):
logging.info(f"New connection {client}, {rc}")
client.subscribe(topic, qos=2)
def on_message(client, userdata, msg):
data = json.loads(msg.payload.decode('utf-8'))
logging.debug(f"new message from {client} - {data}")
print(data)
client = mqtt.Client("python", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect_async(localhost, port, timeout)
client.loop_forever()
提前致谢
您的 client_id 需要独一无二。检查一下。
我需要构建一个包含多个发布者、一个经纪人和一个订阅者的网络。
问题: 我有一个带有一对一选项的工作代码(一个酒吧 - 一个子)。但是,当我 运行 更多发布者时,订阅者仅从最后连接的发布者接收消息,我不知道为什么会这样。
更多信息:Broker (mosquitto) 在 docker 容器中工作,每个发布者都是一个单独的脚本,目标是 运行 多个 docker 容器,一个发布者在每个容器,但现在我需要解决通信问题。
有没有人对如何调试或解决这个问题有任何线索或想法?
这是发布者脚本:
import time
import paho.mqtt.client as mqtt
import datetime
import random
import json
import logging
from multiprocessing import Process
CLEAN_SESSION = False
def on_connect(client, userdata, flags, rc):
logging.info(f"New connection {client}, {rc}")
def sensor(client_id):
localhost = '172.17.0.2'
port = 1883
timeout = 60
topic = "/mia/sensor"
client_id = f"sensor_{client_id}"
def check_sensor():
time.sleep(1)
rand = random.randint(0, 10)
if rand > 5:
current_time = datetime.datetime.now()
current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
return {"time": current_time, "signal": 1, "id": client_id}
else:
return 0
client = mqtt.Client(client_id, clean_session=CLEAN_SESSION)
client.on_connect = on_connect
client.connect(localhost, port, timeout)
while True:
check_info = check_sensor()
if check_info:
message_payload = json.dumps(check_info)
logging.info(message_payload)
client.publish(topic, message_payload, qos=2)
client.loop()
client.disconnect()
if __name__ == "__main__":
p = Process(target=sensor, args=(1,))
p.start()
print("new publisher created!")
这是订阅者脚本:
import paho.mqtt.client as mqtt
import paho.mqtt.subscribe as sub
import json
import logging
localhost = '172.17.0.2'
port = 1883
timeout = 60
topic = "/mia/sensor"
def on_connect(client, userdata, flags, rc):
logging.info(f"New connection {client}, {rc}")
client.subscribe(topic, qos=2)
def on_message(client, userdata, msg):
data = json.loads(msg.payload.decode('utf-8'))
logging.debug(f"new message from {client} - {data}")
print(data)
client = mqtt.Client("python", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect_async(localhost, port, timeout)
client.loop_forever()
提前致谢
您的 client_id 需要独一无二。检查一下。