使用 Flask 在后台异步作业之上模拟同步请求

Simulate a synchronous request on top of background async job with Flask

我会先解释一下我的系统架构,然后转到问题:

我有一个 REST API 用作我的 API 网关。该服务器是使用 Flask 构建的。我还有 RabbitMQ 集群,以及我编写的客户端,它监听特定队列并执行其获取的任务。

到目前为止,我的所有请求都是异步的,所以一旦请求到达 API 网关,callback_uri 字段 URL 到 POST结果作为请求的一部分提供,API 网关只负责将任务发送到 RabbitMQ,工作人员处理任务,最后 POST 将结果返回回调 URL.

我的问题是:

我希望新端点在某种意义上是同步的,即处理仍将由与以前相同的工作人员完成,但我想将结果返回到 API 网关 return 给用户并释放连接。

我目前的解决方案:

我像以前一样将唯一的 callback_uri 作为请求的一部分发送给工作人员,但现在特定端点由我的 API 网关实现并允许 POST和 GET 方法,因此工作人员可以在完成后 POST 结果,并且我的 API 网关不断轮询回调 URL 直到结果可用,然后 return 结果给客户。

除了让忙碌等待的 HTTP 工作者轮询自己的端点以获取结果之外,还有其他首选选项吗?但仍然是同步的,因此只有在结果可用时才释放连接?

代码仅供说明:

@app.route('/long_task', methods=['POST'])
@sync_request
def long_task():
    try:
        if request.get_json() is None:
            return ERROR_MSG_NO_JSON, 400
        create_and_send_request_to_rabbitmq()
        return '', 200
    except Exception as ex:
        return ERROR_MSG_NO_DATA, 400


def sync_request(func):

    def call(*args, **kwargs):
        create_callback_uri()
        result = func(*args, **kwargs)
        status_code = result[1]
        if status_code == 200:
            result = get_callback_result()
        return result

    return call

def get_callback_result():
    callback_uri = request.get_json()['callback_uri']
    has_answer = False
    headers = {'content-type': 'application/json'}
    empty_response = {}
    content = json.dumps(empty_response)

    try:
        with Timeout(seconds=SYNC_REQUEST_TIMEOUT_SECONDS):
            while not has_answer:
                response = requests.get(callback_uri, headers=headers)
                if response.status_code == 200:
                    has_answer = True
                    content = response.content
                else:
                    time.sleep(0.2)
    except TimeoutException:
        log.debug('Timed out on sync request for request %s ' % request)

    return content, 200

因此,如果我对您的理解正确,您希望后端等待某个工作人员的响应(通过 RabbitMQ)。您可以通过实施 rpc over rabbitmq 来实现。关键思想是使用关联 ID。

但绝对最有效的方法是 运行 客户端通过 websockets(如果不是浏览器,则为原始 tcp 套接字)并在工作完成时直接通知他。这样您就不会锁定资源(客户端连接、rabbitmq 队列)并且避免性能下降 (rpc)。