从 Influxdb 输出数据到 MQTT broker/server
Output data from Influxdb to MQTT broker/server
这是我第一次使用 MQTT,我想从 influxdb 获取数据到 Snowflake,但在此之前我必须:
- 从influxdb获取数据到MQTT,我在网上找不到任何例子。
我尝试使用以下脚本将数据从 mqtt 保存到 InfluxDB:
"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""
import re
from typing import NamedTuple
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = '10.10.10.247'
INFLUXDB_USER = 'iotuser'
INFLUXDB_PASSWORD = 'iotpassword'
INFLUXDB_DATABASE = 'homeiot_db'
MQTT_ADDRESS = '10.10.10.247'
MQTT_USER = 'iotuser'
MQTT_PASSWORD = 'iotpassword'
MQTT_TOPIC = 'home/+/+' # [room]/[temperature|humidity|light|status]
MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
class SensorData(NamedTuple):
location: str
measurement: str
value: float
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
if sensor_data is not None:
_send_sensor_data_to_influxdb(sensor_data)
def _parse_mqtt_message(topic, payload):
match = re.match(MQTT_REGEX, topic)
if match:
location = match.group(1)
measurement = match.group(2)
if measurement == 'status':
return None
return SensorData(location, measurement, float(payload))
else:
return None
def _send_sensor_data_to_influxdb(sensor_data):
json_body = [
{
'measurement': sensor_data.measurement,
'tags': {
'location': sensor_data.location
},
'fields': {
'value': sensor_data.value
}
}
]
print (json_body)
influxdb_client.write_points(json_body)
def _init_influxdb_database():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
influxdb_client.create_database(INFLUXDB_DATABASE)
influxdb_client.switch_database(INFLUXDB_DATABASE)
def main():
_init_influxdb_database()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
如果有人以前做过这个或者知道如何解决这个问题,我将非常感谢任何帮助。
如果您只需要将数据从 InfluxDB 传输到 Snowflake,您可以尝试充分利用下面现有的 API:
import "sql"
data
|> sql.to(
driverName: "snowflake",
dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
table: "example_table",
)
查看更多详情here。
好吧,如果你坚持使用 MQTT,有一个 official and easy way 可以通过 Flux 完成这个任务(你可以在 InfluxDB 2.0 UI 上配置这些)而不是 Python:
import "mqtt"
from(bucket: "telegraf")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_system"))
|> filter(fn: (r) =>
(r.cpu == "cpu-total"))
|> last()
|> mqtt.to(
broker: "tcp://davidgs.com:8883",
topic: "cpu",
clientid: "cpu-flux",
valueColumns: ["_value"],
tagColumns: ["cpu", "host"],
)
这是我第一次使用 MQTT,我想从 influxdb 获取数据到 Snowflake,但在此之前我必须:
- 从influxdb获取数据到MQTT,我在网上找不到任何例子。
我尝试使用以下脚本将数据从 mqtt 保存到 InfluxDB:
"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""
import re
from typing import NamedTuple
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = '10.10.10.247'
INFLUXDB_USER = 'iotuser'
INFLUXDB_PASSWORD = 'iotpassword'
INFLUXDB_DATABASE = 'homeiot_db'
MQTT_ADDRESS = '10.10.10.247'
MQTT_USER = 'iotuser'
MQTT_PASSWORD = 'iotpassword'
MQTT_TOPIC = 'home/+/+' # [room]/[temperature|humidity|light|status]
MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
class SensorData(NamedTuple):
location: str
measurement: str
value: float
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
if sensor_data is not None:
_send_sensor_data_to_influxdb(sensor_data)
def _parse_mqtt_message(topic, payload):
match = re.match(MQTT_REGEX, topic)
if match:
location = match.group(1)
measurement = match.group(2)
if measurement == 'status':
return None
return SensorData(location, measurement, float(payload))
else:
return None
def _send_sensor_data_to_influxdb(sensor_data):
json_body = [
{
'measurement': sensor_data.measurement,
'tags': {
'location': sensor_data.location
},
'fields': {
'value': sensor_data.value
}
}
]
print (json_body)
influxdb_client.write_points(json_body)
def _init_influxdb_database():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
influxdb_client.create_database(INFLUXDB_DATABASE)
influxdb_client.switch_database(INFLUXDB_DATABASE)
def main():
_init_influxdb_database()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
如果有人以前做过这个或者知道如何解决这个问题,我将非常感谢任何帮助。
如果您只需要将数据从 InfluxDB 传输到 Snowflake,您可以尝试充分利用下面现有的 API:
import "sql"
data
|> sql.to(
driverName: "snowflake",
dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
table: "example_table",
)
查看更多详情here。
好吧,如果你坚持使用 MQTT,有一个 official and easy way 可以通过 Flux 完成这个任务(你可以在 InfluxDB 2.0 UI 上配置这些)而不是 Python:
import "mqtt"
from(bucket: "telegraf")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_system"))
|> filter(fn: (r) =>
(r.cpu == "cpu-total"))
|> last()
|> mqtt.to(
broker: "tcp://davidgs.com:8883",
topic: "cpu",
clientid: "cpu-flux",
valueColumns: ["_value"],
tagColumns: ["cpu", "host"],
)