MQTT 和 Postgresql 数据库:使用 python 将 mqtt 消息插入到 postgresql table 的列中

MQTT & Postgresql DB: Insert a mqtt message into a column in postgresql table using python

我是 PostgreSQL 的新手,我想使用 pysycopg2 将 mqtt 消息插入到 PostgreSQL 数据库。不幸的是,它没有按预期工作。我认为这是一个简单的错误,但无法弄清楚错误到底是什么。首先,我使用 python 脚本 [1] 在 mosquitto 代理中发布了 mqtt 消息,然后从另一个脚本 [2] 订阅并尝试存储到 postgresql 中。相应的错误信息如[3].

这是我的 Publisher 脚本,用于将伪造的 mqtt-json 数据发布到 mosquitto 代理:

#!/usr/bin/python

import paho.mqtt.client as mqtt
import numpy as np
import time

broker_address = "localhost"

def on_connect(client, userdata, flags, rc):
  print("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()

while True:
  time.sleep(0.05)
  degrees = np.random.random_sample()
  toa = np.random.random_sample()
  humidity = np.random.random_sample()
  json =  ('''[{"time": "2020-04-01 21:00:00",  "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]''')

  locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'

  locpk= str(locpk)
  json= str(json)
  client.publish("device14/geo", locpk, 1, 1)
  client.publish("device14/geo", json, 1, 1)

这是我的订阅者脚本,用于订阅已发布的消息并插入到 PostgreSQL 中:

#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("device14/geo",0)

def on_message(client, userdata, msg):
    Date = datetime.datetime.utcnow()
    message= msg.payload.decode()
    try:
            #print the JSON Message with Data and Topic
            print(str(Date) + ": " + msg.topic + " " + str(message))
            #concatenate the SQL string
            sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
            #execute the INSERT statement
            cur = conn.cursor()
            cur.execute(sql_string)
            #commit the changes to the database
            conn.commit()
            print("Finished writing to PostgreSQL")
    except (Exception, Error) as err:
            print("\npsycopg2 connect error:", err)
            #print("Could not insert " + message + " into Postgresql DB")

#Set up a client for Postgresql DB
try:
    #read connection parameters
    params = config()
    #connect to the PostgreSQL server
    print('Connecting to the PostgreSQL database...')
    conn = psycopg2.connect(**params)
    #create a cursor
    cur = conn.cursor()
    #execute a statement
    print('PostgreSQL database version:')
    cur.execute('SELECT version()')
    cur.execute(sql)
    #display the PostgreSQL database server version
    db_version = cur.fetchone()

    print(db_version)

except (Exception, psycopg2.DatabaseError) as error:
    print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()

#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)

client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()

错误:

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py


Connecting to the PostgreSQL database...

PostgreSQL database version:

no results to fetch

Connected with result code 0

Received a message on topic: device14/geo


2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00",   "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]


psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
               ^**


Received a message on topic: device14/geo

2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}

psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...

期待您的评论。任何帮助将不胜感激。

PS:我也尝试过更改发布消息结构(即 locpk,json)但没有帮助。如果您对发布的消息结构应该如何有任何建议,请告诉我。我会试试的。

我没有看到 table_name 的 table 结构,但如果它只有一列 (column_name) 并且您想将 JSON 文档存储在您需要在 PostgreSQL 中将其定义为 jsonb
在这样的列中插入数据很容易:

from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...

但是,对整条消息使用单个列并不是一个好的设计选择。
time, device_addr, latitude, longitude, altitude 等常用键创建列(我只是根据提供的数据在这里猜测)。
将不太重要(可能丢失)的键存储在单独的 jsonb 列中(例如称为 data)。