uvicorn 服务器上的 Paho MQTT
Paho MQTT on uvicorn server
我有一个非常简单的 FastAPI
python 服务器:
import io
import os
import sys
import json
import time
from PIL import Image
import paho.mqtt.client as mqtt
from fastapi import FastAPI, File, HTTPException, UploadFile, Form
# Initialize FastAPI
app = FastAPI()
# Initialize ENV variables
args = {
'broker' : os.environ.get('BROKER', '127.0.0.1'),
'port' : int(os.environ.get('PORT', '1883')),
'topic' : os.environ.get('TOPIC', 'topic')
}
# Initialize MQTT
print('Connecting to MQTT broker {}:{}.'.format(args['broker'], args['port']), flush=True)
mqtt_client = mqtt.Client(args['model'])
mqtt_client.connect(args['broker'], args['port'])
@app.get("/")
async def info():
return "Send a POST request to / with an image\nWill publish results to topic {}".format(args['topic'])
@app.post("/")
async def run(image: UploadFile = File(...)):
try:
start = time.time()
# Read request data
contents = await image.read()
image = Image.open(io.BytesIO(contents))
# Do something with the image
results = ["todo"]
print('Process took {} seconds'.format(time.time() - start), flush=True)
# Publish to MQTT topic
print('Publish to MQTT {}'.format(args['topic']), flush=True)
(rc, mid) = mqtt_client.publish(args['topic'], json.dumps(results), qos=2)
print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
#if not rc == mqtt.MQTT_ERR_SUCCESS: print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
# Format response
data = {}
data['res'] = results
data['count'] = len(results)
data['success'] = True
return data
except:
e = sys.exc_info()[1]
print('Python error with no Exception handler:')
print('Traceback error: {}'.format(e))
raise HTTPException(status_code=500, detail=str(e))
我的目标是发布关于 HTTP 响应和 MQTT 主题的结果。
MQTT 连接似乎正常工作。
当我向网络服务器发送请求时,出现以下日志:
INFO: Will watch for changes in these directories: ['/app']
INFO: Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO: Started reloader process [1] using statreload
Connecting to MQTT broker 192.168.1.201:1883.
INFO: Started server process [7]
INFO: Waiting for application startup.
INFO: Application startup complete.
Process took 0.025616168975830078 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 1: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.023961544036865234 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 2: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.031525611877441406 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 3: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
发布似乎很顺利,但我收不到任何消息(而它直接从命令行运行)。
服务器使用命令启动:
uvicorn server:app --reload --port 80 --host 0.0.0.0
如何从另一个线程 运行 publish
?
您还没有启动 MQTT 客户端network loop。
您可能应该在调用 mqtt_client.connect()
之后添加 mqtt_client.loop_start()
我有一个非常简单的 FastAPI
python 服务器:
import io
import os
import sys
import json
import time
from PIL import Image
import paho.mqtt.client as mqtt
from fastapi import FastAPI, File, HTTPException, UploadFile, Form
# Initialize FastAPI
app = FastAPI()
# Initialize ENV variables
args = {
'broker' : os.environ.get('BROKER', '127.0.0.1'),
'port' : int(os.environ.get('PORT', '1883')),
'topic' : os.environ.get('TOPIC', 'topic')
}
# Initialize MQTT
print('Connecting to MQTT broker {}:{}.'.format(args['broker'], args['port']), flush=True)
mqtt_client = mqtt.Client(args['model'])
mqtt_client.connect(args['broker'], args['port'])
@app.get("/")
async def info():
return "Send a POST request to / with an image\nWill publish results to topic {}".format(args['topic'])
@app.post("/")
async def run(image: UploadFile = File(...)):
try:
start = time.time()
# Read request data
contents = await image.read()
image = Image.open(io.BytesIO(contents))
# Do something with the image
results = ["todo"]
print('Process took {} seconds'.format(time.time() - start), flush=True)
# Publish to MQTT topic
print('Publish to MQTT {}'.format(args['topic']), flush=True)
(rc, mid) = mqtt_client.publish(args['topic'], json.dumps(results), qos=2)
print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
#if not rc == mqtt.MQTT_ERR_SUCCESS: print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
# Format response
data = {}
data['res'] = results
data['count'] = len(results)
data['success'] = True
return data
except:
e = sys.exc_info()[1]
print('Python error with no Exception handler:')
print('Traceback error: {}'.format(e))
raise HTTPException(status_code=500, detail=str(e))
我的目标是发布关于 HTTP 响应和 MQTT 主题的结果。 MQTT 连接似乎正常工作。
当我向网络服务器发送请求时,出现以下日志:
INFO: Will watch for changes in these directories: ['/app']
INFO: Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO: Started reloader process [1] using statreload
Connecting to MQTT broker 192.168.1.201:1883.
INFO: Started server process [7]
INFO: Waiting for application startup.
INFO: Application startup complete.
Process took 0.025616168975830078 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 1: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.023961544036865234 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 2: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.031525611877441406 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 3: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
发布似乎很顺利,但我收不到任何消息(而它直接从命令行运行)。
服务器使用命令启动:
uvicorn server:app --reload --port 80 --host 0.0.0.0
如何从另一个线程 运行 publish
?
您还没有启动 MQTT 客户端network loop。
您可能应该在调用 mqtt_client.connect()
mqtt_client.loop_start()