MQTT 和 Postgresql 数据库:使用 python 将 mqtt 消息插入到 postgresql table 的列中
MQTT & Postgresql DB: Insert a mqtt message into a column in postgresql table using python
我是 PostgreSQL 的新手,我想使用 pysycopg2 将 mqtt 消息插入到 PostgreSQL 数据库。不幸的是,它没有按预期工作。我认为这是一个简单的错误,但无法弄清楚错误到底是什么。首先,我使用 python 脚本 [1] 在 mosquitto 代理中发布了 mqtt 消息,然后从另一个脚本 [2] 订阅并尝试存储到 postgresql 中。相应的错误信息如[3].
这是我的 Publisher 脚本,用于将伪造的 mqtt-json 数据发布到 mosquitto 代理:
#!/usr/bin/python
import paho.mqtt.client as mqtt
import numpy as np
import time
broker_address = "localhost"
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()
while True:
time.sleep(0.05)
degrees = np.random.random_sample()
toa = np.random.random_sample()
humidity = np.random.random_sample()
json = ('''[{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]''')
locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'
locpk= str(locpk)
json= str(json)
client.publish("device14/geo", locpk, 1, 1)
client.publish("device14/geo", json, 1, 1)
这是我的订阅者脚本,用于订阅已发布的消息并插入到 PostgreSQL 中:
#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("device14/geo",0)
def on_message(client, userdata, msg):
Date = datetime.datetime.utcnow()
message= msg.payload.decode()
try:
#print the JSON Message with Data and Topic
print(str(Date) + ": " + msg.topic + " " + str(message))
#concatenate the SQL string
sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
#execute the INSERT statement
cur = conn.cursor()
cur.execute(sql_string)
#commit the changes to the database
conn.commit()
print("Finished writing to PostgreSQL")
except (Exception, Error) as err:
print("\npsycopg2 connect error:", err)
#print("Could not insert " + message + " into Postgresql DB")
#Set up a client for Postgresql DB
try:
#read connection parameters
params = config()
#connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
#create a cursor
cur = conn.cursor()
#execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')
cur.execute(sql)
#display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
try:
client.connect("localhost", 1883, 60)
connOK = True
except:
connOK = False
time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()
错误:
/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py
Connecting to the PostgreSQL database...
PostgreSQL database version:
no results to fetch
Connected with result code 0
Received a message on topic: device14/geo
2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]
psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
^**
Received a message on topic: device14/geo
2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}
psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...
期待您的评论。任何帮助将不胜感激。
PS:我也尝试过更改发布消息结构(即 locpk,json)但没有帮助。如果您对发布的消息结构应该如何有任何建议,请告诉我。我会试试的。
我没有看到 table_name
的 table 结构,但如果它只有一列 (column_name
) 并且您想将 JSON 文档存储在您需要在 PostgreSQL 中将其定义为 jsonb
。
在这样的列中插入数据很容易:
from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...
但是,对整条消息使用单个列并不是一个好的设计选择。
为 time, device_addr, latitude, longitude, altitude
等常用键创建列(我只是根据提供的数据在这里猜测)。
将不太重要(可能丢失)的键存储在单独的 jsonb 列中(例如称为 data
)。
我是 PostgreSQL 的新手,我想使用 pysycopg2 将 mqtt 消息插入到 PostgreSQL 数据库。不幸的是,它没有按预期工作。我认为这是一个简单的错误,但无法弄清楚错误到底是什么。首先,我使用 python 脚本 [1] 在 mosquitto 代理中发布了 mqtt 消息,然后从另一个脚本 [2] 订阅并尝试存储到 postgresql 中。相应的错误信息如[3].
这是我的 Publisher 脚本,用于将伪造的 mqtt-json 数据发布到 mosquitto 代理:
#!/usr/bin/python
import paho.mqtt.client as mqtt
import numpy as np
import time
broker_address = "localhost"
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()
while True:
time.sleep(0.05)
degrees = np.random.random_sample()
toa = np.random.random_sample()
humidity = np.random.random_sample()
json = ('''[{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]''')
locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'
locpk= str(locpk)
json= str(json)
client.publish("device14/geo", locpk, 1, 1)
client.publish("device14/geo", json, 1, 1)
这是我的订阅者脚本,用于订阅已发布的消息并插入到 PostgreSQL 中:
#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("device14/geo",0)
def on_message(client, userdata, msg):
Date = datetime.datetime.utcnow()
message= msg.payload.decode()
try:
#print the JSON Message with Data and Topic
print(str(Date) + ": " + msg.topic + " " + str(message))
#concatenate the SQL string
sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
#execute the INSERT statement
cur = conn.cursor()
cur.execute(sql_string)
#commit the changes to the database
conn.commit()
print("Finished writing to PostgreSQL")
except (Exception, Error) as err:
print("\npsycopg2 connect error:", err)
#print("Could not insert " + message + " into Postgresql DB")
#Set up a client for Postgresql DB
try:
#read connection parameters
params = config()
#connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
#create a cursor
cur = conn.cursor()
#execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')
cur.execute(sql)
#display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
try:
client.connect("localhost", 1883, 60)
connOK = True
except:
connOK = False
time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()
错误:
/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py
Connecting to the PostgreSQL database...
PostgreSQL database version:
no results to fetch
Connected with result code 0
Received a message on topic: device14/geo
2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]
psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
^**
Received a message on topic: device14/geo
2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}
psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...
期待您的评论。任何帮助将不胜感激。
PS:我也尝试过更改发布消息结构(即 locpk,json)但没有帮助。如果您对发布的消息结构应该如何有任何建议,请告诉我。我会试试的。
我没有看到 table_name
的 table 结构,但如果它只有一列 (column_name
) 并且您想将 JSON 文档存储在您需要在 PostgreSQL 中将其定义为 jsonb
。
在这样的列中插入数据很容易:
from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...
但是,对整条消息使用单个列并不是一个好的设计选择。
为 time, device_addr, latitude, longitude, altitude
等常用键创建列(我只是根据提供的数据在这里猜测)。
将不太重要(可能丢失)的键存储在单独的 jsonb 列中(例如称为 data
)。