uvicorn 服务器上的 Paho MQTT

Paho MQTT on uvicorn server

我有一个非常简单的 FastAPI python 服务器:

import io
import os
import sys
import json
import time
from PIL import Image
import paho.mqtt.client as mqtt
from fastapi import FastAPI, File, HTTPException, UploadFile, Form

# Initialize FastAPI
app = FastAPI()

# Initialize ENV variables
args = {
    'broker'    : os.environ.get('BROKER', '127.0.0.1'),
    'port'      : int(os.environ.get('PORT', '1883')),
    'topic'     : os.environ.get('TOPIC', 'topic')
}
    
# Initialize MQTT
print('Connecting to MQTT broker {}:{}.'.format(args['broker'], args['port']), flush=True)
mqtt_client = mqtt.Client(args['model'])
mqtt_client.connect(args['broker'], args['port'])

@app.get("/")
async def info():
    return "Send a POST request to / with an image\nWill publish results to topic {}".format(args['topic'])
    
@app.post("/")
async def run(image: UploadFile = File(...)):
    try:
        start = time.time()
        
        # Read request data
        contents = await image.read()
        image = Image.open(io.BytesIO(contents))

        # Do something with the image
        results = ["todo"]
        print('Process took {} seconds'.format(time.time() - start), flush=True)
        
        # Publish to MQTT topic
        print('Publish to MQTT {}'.format(args['topic']), flush=True)
        (rc, mid) = mqtt_client.publish(args['topic'], json.dumps(results), qos=2)
        print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
        #if not rc == mqtt.MQTT_ERR_SUCCESS: print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))

        # Format response
        data = {}
        data['res'] = results
        data['count'] = len(results)
        data['success'] = True
        return data
    except:
        e = sys.exc_info()[1]
        print('Python error with no Exception handler:')
        print('Traceback error: {}'.format(e))
        raise HTTPException(status_code=500, detail=str(e))

我的目标是发布关于 HTTP 响应和 MQTT 主题的结果。 MQTT 连接似乎正常工作。

当我向网络服务器发送请求时,出现以下日志:

INFO:     Will watch for changes in these directories: ['/app']
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO:     Started reloader process [1] using statreload    
Connecting to MQTT broker 192.168.1.201:1883.
INFO:     Started server process [7]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Process took 0.025616168975830078 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 1: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.023961544036865234 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 2: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.031525611877441406 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 3: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK

发布似乎很顺利,但我收不到任何消息(而它直接从命令行运行)。

服务器使用命令启动:

uvicorn server:app --reload --port 80 --host 0.0.0.0

如何从另一个线程 运行 publish

您还没有启动 MQTT 客户端network loop

您可能应该在调用 mqtt_client.connect()

之后添加 mqtt_client.loop_start()