rabbitMQ 的鼠兔在使用烧瓶服务器时崩溃

pika for rabbitMQ crashing while using flask server

所以我们有一个单线程烧瓶服务器 运行,我们在其中接收来自 python 应用程序客户端的请求。在这个烧瓶服务器中,我们使用带有 pika 库的 rabbitMQ 将消息分发给其他客户端。 发生的事情是在 get 函数中程序崩溃并出现错误:

pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')

我在堆栈溢出和其他方面搜索了很多关于此的主题,但它们都解决了多线程问题,但事实并非如此。 Flask 应该只使用一个线程,除非在 app.run(threaded=yes).

中调用它

当短时间内发送多条消息(例如每秒 5 条)时,程序通常会崩溃,同样重要的是要注意每秒都会收到消息并请求此功能:

@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():  
    data = json.loads(request.data)
    token = data['token']

    payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
    istid = payload['istid']
    print('istid: '+istid)

    messages = []

    queue = channel.queue_declare(queue=istid)
    for i in range(queue.method.message_count):
        method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
        if method_frame:
            #print(method_frame, header_frame, body)
            messages.append(body)
        else:
            print('No message returned')

    res = {'messages':messages, 'error':0}
    return jsonify(res)

在此代码中,它在以下行中正常崩溃:

queue = channel.queue_declare(queue=istid)

但我们也尝试将代码更改为使用 while 而不是 for 当正文为 None 并且它在行中崩溃时结束的位置:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True) 在这种情况下。 同样重要的是,崩溃是随机的,它可以工作几次,然后在发送消息时收到请求后随机崩溃。如果有人知道与此相关的任何信息,我们将不胜感激。

另一个注意事项,我们考虑过使用 basic_consume 和回调而不是 basic_get 但我们没有找到可行的方法,因为我们必须发回消息并有几个用户向同一功能发出请求。

编辑#1: 在 rabbitMQ 文档 rabbitmq 中,如果您搜索函数 "def basic_get",您会注意到有一些 TODO 注释以及对此

的引用

Due to implementation details, this cannot be called a second time until the callback is executed.

所以我怀疑这可能是正在发生的事情,但即使是我也不知道如何解决。

对于任何对此解决方案感兴趣的人,就像在其他评论中一样,该程序不是线程安全的,因为从 1.0 版开始,flask 默认使用 threaded = True。
解决方案是:
1) 运行 烧瓶 app.run(threaded = False)
2) 通过在使用 pika 访问通道/连接时实施锁定来确保程序线程安全。