MQTT Paho Python 客户端订阅者,如何永远订阅?
MQTT Paho Python Client subscriber, how subscribe forever?
尝试在不断开与 Mosquitto 代理的连接的情况下进行简单订阅,以获取来自在特定主题中发布数据的设备的所有消息,将它们保存在 BD 中并 Post 它们到 php做 "staff".
这是我的subscribe.py
import paho.mqtt.client as mqtt
from mqtt_myapp import *
topic="topic/#" # MQTT broker topic
myclient="my-paho-client" # MQTT broker My Client
user="user" # MQTT broker user
pw="pass" # MQTT broker password
host="localhost" # MQTT broker host
port=1883 # MQTT broker port
value="123" # somethin i need for myapp
def on_connect(mqttc, userdata, rc):
print('connected...rc=' + str(rc))
mqttc.subscribe(topic, qos=0)
def on_disconnect(mqttc, userdata, rc):
print('disconnected...rc=' + str(rc))
def on_message(mqttc, userdata, msg):
print('message received...')
print('topic: ' + msg.topic + ', qos: ' +
str(msg.qos) + ', message: ' + str(msg.payload))
save_to_db(msg)
post_data(msg.payload,value)
def on_subscribe(mqttc, userdata, mid, granted_qos):
print('subscribed (qos=' + str(granted_qos) + ')')
def on_unsubscribe(mqttc, userdata, mid, granted_qos):
print('unsubscribed (qos=' + str(granted_qos) + ')')
mqttc = mqtt.Client(myclient)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.username_pw_set(user,pw)
mqttc.connect(host, port, 60)
mqttc.loop_forever()
这是我的 mqtt_myapp.py:
import MySQLdb
import requests # pip install requests
url = "mydomain/data_from_broker.php"
def save_to_db(msg):
with db:
cursor = db.cursor()
try:
cursor.execute("INSERT INTO MQTT_LOGS (topic, payload) VALUES (%s,%s)", (msg.topic, msg.payload))
except (MySQLdb.Error, MySQLdb.Warning) as e:
print('excepttion BD ' + e)
return None
def post_data(payload,value):
datos = {'VALUE': value,'data-from-broker': payload}
r = requests.post(url, datos)
r.status_code
print('response POST' + str(r.status_code))
db = MySQLdb.connect("localhost","user_db","pass_db","db" )
当我 运行 我的 python 脚本在后台使用 python -t mqtt_subscribe.py &
时,我收到了为其他客户端发布的消息,但是 在我的 subscribe.py 脚本运行ning,发生套接字错误。
Mosquito.log:
...
1475614815: Received PINGREQ from my-paho-client
1475614815: Sending PINGRESP to my-paho-client
1475614872: New connection from xxx.xxx.xxx.xxx on port 1883.
1475614872: Client device1 disconnected.
1475614872: New client connected from xxx.xxx.xxx.xxx as device1(c0, k0, u'user1').
1475614872: Sending CONNACK to device1(0, 0)
1475614873: Received PUBLISH from device1(d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475614873: Sending PUBACK to device1 (Mid: 1)
1475614873: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475614874: Received DISCONNECT from device1
1475614874: Client device1 disconnected.
...
1475625566: Received PINGREQ from my-paho-client
1475625566: Sending PINGRESP to my-paho-client
1475625626: Received PINGREQ from my-paho-client
1475625626: Sending PINGRESP to my-paho-client
1475625675: New connection from xxx.xxx.xxx.xxx on port 1883.
1475625675: Client device1 disconnected.
1475625675: New client connected from xxx.xxx.xxx.xxx as device1 (c0, k0, u'user1').
1475625675: Sending CONNACK to device1 (0, 0)
1475625677: Received PUBLISH from device1 (d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475625677: Sending PUBACK to device1 (Mid: 1)
1475625677: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475625677: Socket error on client my-paho-client, disconnecting.
1475625677: Received DISCONNECT from device1
...
可能是什么问题?有什么想法或建议吗?
提前致谢
如果您在方法 "on_message" 中的代码抛出异常而您没有捕获它,您将断开连接。
尝试取消注释除 print 语句之外的所有语句。可能以下语句之一抛出异常。
save_to_db(msg)
post_data(msg.payload,value)
插入数据库时,使用 ?
而不是 %s
。
link for reference
cursor.execute('''INSERT INTO users(name, phone, email, password)
VALUES(?,?,?,?)''', (name1,phone1, email1, password1))
尝试在不断开与 Mosquitto 代理的连接的情况下进行简单订阅,以获取来自在特定主题中发布数据的设备的所有消息,将它们保存在 BD 中并 Post 它们到 php做 "staff".
这是我的subscribe.py
import paho.mqtt.client as mqtt
from mqtt_myapp import *
topic="topic/#" # MQTT broker topic
myclient="my-paho-client" # MQTT broker My Client
user="user" # MQTT broker user
pw="pass" # MQTT broker password
host="localhost" # MQTT broker host
port=1883 # MQTT broker port
value="123" # somethin i need for myapp
def on_connect(mqttc, userdata, rc):
print('connected...rc=' + str(rc))
mqttc.subscribe(topic, qos=0)
def on_disconnect(mqttc, userdata, rc):
print('disconnected...rc=' + str(rc))
def on_message(mqttc, userdata, msg):
print('message received...')
print('topic: ' + msg.topic + ', qos: ' +
str(msg.qos) + ', message: ' + str(msg.payload))
save_to_db(msg)
post_data(msg.payload,value)
def on_subscribe(mqttc, userdata, mid, granted_qos):
print('subscribed (qos=' + str(granted_qos) + ')')
def on_unsubscribe(mqttc, userdata, mid, granted_qos):
print('unsubscribed (qos=' + str(granted_qos) + ')')
mqttc = mqtt.Client(myclient)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.username_pw_set(user,pw)
mqttc.connect(host, port, 60)
mqttc.loop_forever()
这是我的 mqtt_myapp.py:
import MySQLdb
import requests # pip install requests
url = "mydomain/data_from_broker.php"
def save_to_db(msg):
with db:
cursor = db.cursor()
try:
cursor.execute("INSERT INTO MQTT_LOGS (topic, payload) VALUES (%s,%s)", (msg.topic, msg.payload))
except (MySQLdb.Error, MySQLdb.Warning) as e:
print('excepttion BD ' + e)
return None
def post_data(payload,value):
datos = {'VALUE': value,'data-from-broker': payload}
r = requests.post(url, datos)
r.status_code
print('response POST' + str(r.status_code))
db = MySQLdb.connect("localhost","user_db","pass_db","db" )
当我 运行 我的 python 脚本在后台使用 python -t mqtt_subscribe.py &
时,我收到了为其他客户端发布的消息,但是 在我的 subscribe.py 脚本运行ning,发生套接字错误。
Mosquito.log:
...
1475614815: Received PINGREQ from my-paho-client
1475614815: Sending PINGRESP to my-paho-client
1475614872: New connection from xxx.xxx.xxx.xxx on port 1883.
1475614872: Client device1 disconnected.
1475614872: New client connected from xxx.xxx.xxx.xxx as device1(c0, k0, u'user1').
1475614872: Sending CONNACK to device1(0, 0)
1475614873: Received PUBLISH from device1(d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475614873: Sending PUBACK to device1 (Mid: 1)
1475614873: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475614874: Received DISCONNECT from device1
1475614874: Client device1 disconnected.
...
1475625566: Received PINGREQ from my-paho-client
1475625566: Sending PINGRESP to my-paho-client
1475625626: Received PINGREQ from my-paho-client
1475625626: Sending PINGRESP to my-paho-client
1475625675: New connection from xxx.xxx.xxx.xxx on port 1883.
1475625675: Client device1 disconnected.
1475625675: New client connected from xxx.xxx.xxx.xxx as device1 (c0, k0, u'user1').
1475625675: Sending CONNACK to device1 (0, 0)
1475625677: Received PUBLISH from device1 (d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475625677: Sending PUBACK to device1 (Mid: 1)
1475625677: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475625677: Socket error on client my-paho-client, disconnecting.
1475625677: Received DISCONNECT from device1
...
可能是什么问题?有什么想法或建议吗?
提前致谢
如果您在方法 "on_message" 中的代码抛出异常而您没有捕获它,您将断开连接。 尝试取消注释除 print 语句之外的所有语句。可能以下语句之一抛出异常。
save_to_db(msg)
post_data(msg.payload,value)
插入数据库时,使用 ?
而不是 %s
。
link for reference
cursor.execute('''INSERT INTO users(name, phone, email, password)
VALUES(?,?,?,?)''', (name1,phone1, email1, password1))