mqtt客户端不能同时处理多条消息
Mqtt client can't handle multiple messages at the same time
我试图让 mqtt 客户端从它订阅的主题接收所有消息,但每次我使用另一个 client.The 发送消息时它只接收第一条消息,问题是客户端应该使用 qos 2 处理 10 条消息,但只处理第一条 one.The 消息同时发送,时间间隔为几毫秒。我不会经常发送 messages.I 每 minute.Both 客户端发送 10 条消息是持久的。我确定消息会离开发布者,因为无论何时发送消息,我都会打印它的有效负载。我正在使用 qos 2,因为收到的消息随后会保存到数据库中,我不想重复。我用的broker是activemq。所以问题是为什么会这样?
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import update
from sqlalchemy.ext.automap import generate_relationship
import sqlalchemy
import paho.mqtt.client as mqtt
import time
#Function that define what to do on client conenction
def on_connect(client, userdata, rc):
#Subscribe to all specified topics
mqttc.subscribe(topic='/+/mysignals/sensors/+/')
def on_message(client,userdata,message):
#Get the mysignals member id from the topic
topic_split = message.topic.split('/')
member_id = topic_split[1]
session = Session(engine)
sensor_id = topic_split[4]
patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first()
if message.payload == None:
payload = 0
else:
payload = message.payload
if patient:
current_time = time.time()
if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55:
pending[patient.id]['record'].__dict__[sensor_id] = payload
print time.time()
else:
pending.pop(patient.id,None)
patientdata = PatientData()
patientdata.__dict__[sensor_id] = payload
print patientdata.__dict__[sensor_id]
print payload
print patientdata.temp
patient.patientdata_collection.append(patientdata)
session.add(patientdata)
print time.time()
pending.update({patient.id:{
'time_created':time.time(),
'record':patientdata,
}})
session.flush()
session.commit()
print('Wrote to database.')
pending = {}
Base = automap_base()
engine = create_engine('mysql+mysqlconnector://user:pass@localhost/db')
# reflect the tables
Base.prepare(engine, reflect=True)
Patient = Base.classes.patient
PatientData = Base.classes.patientdata
session = Session(engine)
#Create a mqtt client object
mqttc = mqtt.Client(client_id='database_logger',clean_session=False)
#Set mqtt client callbacks
mqttc.on_connect = on_connect
mqttc.on_message = on_message
#Set mqtt broker username and password
mqttc.username_pw_set('blah','blahblah')
#Connect to the mqtt broker with the specified hostname/ip adress
mqttc.connect('127.0.0.1')
mqttc.loop_forever()
输出:
98
98
None
1500576377.3
Wrote to database.
1500576377.43
Wrote to database.
输出应该是:
98
98
None
1500576377.3
Wrote to database.
25.4
25.4
25.4
1500576377.43
Wrote to database.
最终不是mqtt客户端的问题。
代码错误,第二条消息未写入数据库。
为了使其正常工作,我必须替换以下行:
pending[patient.id]['record'].__dict__[sensor_id] = payload
这个:
setattr(pending[patient.id]['record'],sensor_id,payload)
同时删除以下行:
session = Session(engine)
在 on_message 函数之外。
我还添加了以下行:
session.expunge_all()
下一行:
session.commit()
为了每次在数据库中完成事务时清理会话。
我试图让 mqtt 客户端从它订阅的主题接收所有消息,但每次我使用另一个 client.The 发送消息时它只接收第一条消息,问题是客户端应该使用 qos 2 处理 10 条消息,但只处理第一条 one.The 消息同时发送,时间间隔为几毫秒。我不会经常发送 messages.I 每 minute.Both 客户端发送 10 条消息是持久的。我确定消息会离开发布者,因为无论何时发送消息,我都会打印它的有效负载。我正在使用 qos 2,因为收到的消息随后会保存到数据库中,我不想重复。我用的broker是activemq。所以问题是为什么会这样?
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import update
from sqlalchemy.ext.automap import generate_relationship
import sqlalchemy
import paho.mqtt.client as mqtt
import time
#Function that define what to do on client conenction
def on_connect(client, userdata, rc):
#Subscribe to all specified topics
mqttc.subscribe(topic='/+/mysignals/sensors/+/')
def on_message(client,userdata,message):
#Get the mysignals member id from the topic
topic_split = message.topic.split('/')
member_id = topic_split[1]
session = Session(engine)
sensor_id = topic_split[4]
patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first()
if message.payload == None:
payload = 0
else:
payload = message.payload
if patient:
current_time = time.time()
if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55:
pending[patient.id]['record'].__dict__[sensor_id] = payload
print time.time()
else:
pending.pop(patient.id,None)
patientdata = PatientData()
patientdata.__dict__[sensor_id] = payload
print patientdata.__dict__[sensor_id]
print payload
print patientdata.temp
patient.patientdata_collection.append(patientdata)
session.add(patientdata)
print time.time()
pending.update({patient.id:{
'time_created':time.time(),
'record':patientdata,
}})
session.flush()
session.commit()
print('Wrote to database.')
pending = {}
Base = automap_base()
engine = create_engine('mysql+mysqlconnector://user:pass@localhost/db')
# reflect the tables
Base.prepare(engine, reflect=True)
Patient = Base.classes.patient
PatientData = Base.classes.patientdata
session = Session(engine)
#Create a mqtt client object
mqttc = mqtt.Client(client_id='database_logger',clean_session=False)
#Set mqtt client callbacks
mqttc.on_connect = on_connect
mqttc.on_message = on_message
#Set mqtt broker username and password
mqttc.username_pw_set('blah','blahblah')
#Connect to the mqtt broker with the specified hostname/ip adress
mqttc.connect('127.0.0.1')
mqttc.loop_forever()
输出:
98
98
None
1500576377.3
Wrote to database.
1500576377.43
Wrote to database.
输出应该是:
98
98
None
1500576377.3
Wrote to database.
25.4
25.4
25.4
1500576377.43
Wrote to database.
最终不是mqtt客户端的问题。 代码错误,第二条消息未写入数据库。
为了使其正常工作,我必须替换以下行:
pending[patient.id]['record'].__dict__[sensor_id] = payload
这个:
setattr(pending[patient.id]['record'],sensor_id,payload)
同时删除以下行:
session = Session(engine)
在 on_message 函数之外。
我还添加了以下行:
session.expunge_all()
下一行:
session.commit()
为了每次在数据库中完成事务时清理会话。