Paho MQTT "failed to receive on socket: [Errno 32] Broken pipe"
Paho MQTT "failed to receive on socket: [Errno 32] Broken pipe"
我设置了一个 Paho MQTT 客户端来接收消息,它看起来像下面的代码...
class PrimaryListener:
def __init__(self):
self.client = mqtt.Client("paho-test-subscriber", False)
self.client.on_connect= self.on_connect
self.client.on_message= self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
self.client.subscribe("test-topic")
def on_connect(self, client, userdata, flags, rc):
if rc==0:
print("connected OK Returned code=",rc,flush=True)
else:
print("Bad connection Returned code=",rc, flush=True)
def on_message(self, client, userdata, message):
msg_str = message.payload.decode("utf-8")
print("message received : " , str(msg_str))
print("message topic : ", message.topic)
def on_disconnect(self, client, userdata,rc=0):
self.client.loop_stop()
def subscribe(self):
self.client.loop_forever()
if __name__ == "__main__":
primaryListener = PrimaryListener()
primaryListener.subscribe()
每个发布者看起来都是这样的,尝试每十秒发送一次消息...
class Publisher:
def __init__(self):
self.client = mqtt.Client("paho-test-publisher", False)
self.client.on_log = self.on_log
self.client.on_connect = self.on_connect
self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
self.publishing_increment = 7
def on_log(self,client, userdata, level, buf):
print("log: ", buf)
def on_connect(self,client, userdata, flags, rc):
if rc==0:
print("connected OK Returned code=",rc, flush=True)
else:
print("Bad connection Returned code=",rc, flush=True)
def send_message(self, log_str):
file_dict = json.loads(log_str)
for item in file_dict:
self.client.publish("test-topic", json.dumps(item))
time.sleep(self.publishing_increment)
def publishFromFile(self,file_name):
with open(file_name, "r") as jsonfile:
file_str = jsonfile.read()
file_dict = json.loads(file_str)
numStr = str(randint(0, 10))
while True:
for item in file_dict:
self.client.publish("test-topic", numStr) #json.dumps(item))
time.sleep(self.publishing_increment)
time.sleep(10)
if __name__ == "__main__":
publisher = Publisher()
publisher.publishFromFile("disconnected_test.txt")
我正在尝试测试这些。当我 运行 PrimaryListener 文件和 运行 另一个终端中的一个 Publisher window 时,它 运行 没问题。当我尝试 运行 第二个 Publisher 时,第一个 Publisher 记录输出行....
log: failed to receive on socket: [Errno 32] Broken pipe
然后停止发送消息。我在这里做错了什么?
MQTT 要求每个客户端都有一个唯一的客户端 ID。
第二个客户端使用相同 ID 连接时的确切行为由代理决定。您看到的是一个常见的:代理启动第一个并接受第二个的连接。
您正在尝试连接具有相同 ID 的两个发布者:"paho-test-publisher"。
如果您不在意具体的ID名称,您可以使用空白ID连接,“”,经纪人会为您的客户分配一个随机名称。
这正是我遇到的问题,然后修复让一切变得不同。不要使用 'client.connect()'
我设置了一个 Paho MQTT 客户端来接收消息,它看起来像下面的代码...
class PrimaryListener:
def __init__(self):
self.client = mqtt.Client("paho-test-subscriber", False)
self.client.on_connect= self.on_connect
self.client.on_message= self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
self.client.subscribe("test-topic")
def on_connect(self, client, userdata, flags, rc):
if rc==0:
print("connected OK Returned code=",rc,flush=True)
else:
print("Bad connection Returned code=",rc, flush=True)
def on_message(self, client, userdata, message):
msg_str = message.payload.decode("utf-8")
print("message received : " , str(msg_str))
print("message topic : ", message.topic)
def on_disconnect(self, client, userdata,rc=0):
self.client.loop_stop()
def subscribe(self):
self.client.loop_forever()
if __name__ == "__main__":
primaryListener = PrimaryListener()
primaryListener.subscribe()
每个发布者看起来都是这样的,尝试每十秒发送一次消息...
class Publisher:
def __init__(self):
self.client = mqtt.Client("paho-test-publisher", False)
self.client.on_log = self.on_log
self.client.on_connect = self.on_connect
self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
self.publishing_increment = 7
def on_log(self,client, userdata, level, buf):
print("log: ", buf)
def on_connect(self,client, userdata, flags, rc):
if rc==0:
print("connected OK Returned code=",rc, flush=True)
else:
print("Bad connection Returned code=",rc, flush=True)
def send_message(self, log_str):
file_dict = json.loads(log_str)
for item in file_dict:
self.client.publish("test-topic", json.dumps(item))
time.sleep(self.publishing_increment)
def publishFromFile(self,file_name):
with open(file_name, "r") as jsonfile:
file_str = jsonfile.read()
file_dict = json.loads(file_str)
numStr = str(randint(0, 10))
while True:
for item in file_dict:
self.client.publish("test-topic", numStr) #json.dumps(item))
time.sleep(self.publishing_increment)
time.sleep(10)
if __name__ == "__main__":
publisher = Publisher()
publisher.publishFromFile("disconnected_test.txt")
我正在尝试测试这些。当我 运行 PrimaryListener 文件和 运行 另一个终端中的一个 Publisher window 时,它 运行 没问题。当我尝试 运行 第二个 Publisher 时,第一个 Publisher 记录输出行....
log: failed to receive on socket: [Errno 32] Broken pipe
然后停止发送消息。我在这里做错了什么?
MQTT 要求每个客户端都有一个唯一的客户端 ID。
第二个客户端使用相同 ID 连接时的确切行为由代理决定。您看到的是一个常见的:代理启动第一个并接受第二个的连接。
您正在尝试连接具有相同 ID 的两个发布者:"paho-test-publisher"。
如果您不在意具体的ID名称,您可以使用空白ID连接,“”,经纪人会为您的客户分配一个随机名称。
这正是我遇到的问题,然后修复让一切变得不同。不要使用 'client.connect()'