使用 Flask 在后台异步作业之上模拟同步请求
Simulate a synchronous request on top of background async job with Flask
我会先解释一下我的系统架构,然后转到问题:
我有一个 REST API 用作我的 API 网关。该服务器是使用 Flask 构建的。我还有 RabbitMQ 集群,以及我编写的客户端,它监听特定队列并执行其获取的任务。
到目前为止,我的所有请求都是异步的,所以一旦请求到达 API 网关,callback_uri
字段 URL 到 POST结果作为请求的一部分提供,API 网关只负责将任务发送到 RabbitMQ,工作人员处理任务,最后 POST 将结果返回回调 URL.
我的问题是:
我希望新端点在某种意义上是同步的,即处理仍将由与以前相同的工作人员完成,但我想将结果返回到 API 网关 return 给用户并释放连接。
我目前的解决方案:
我像以前一样将唯一的 callback_uri
作为请求的一部分发送给工作人员,但现在特定端点由我的 API 网关实现并允许 POST和 GET 方法,因此工作人员可以在完成后 POST 结果,并且我的 API 网关不断轮询回调 URL 直到结果可用,然后 return 结果给客户。
除了让忙碌等待的 HTTP 工作者轮询自己的端点以获取结果之外,还有其他首选选项吗?但仍然是同步的,因此只有在结果可用时才释放连接?
代码仅供说明:
@app.route('/long_task', methods=['POST'])
@sync_request
def long_task():
try:
if request.get_json() is None:
return ERROR_MSG_NO_JSON, 400
create_and_send_request_to_rabbitmq()
return '', 200
except Exception as ex:
return ERROR_MSG_NO_DATA, 400
def sync_request(func):
def call(*args, **kwargs):
create_callback_uri()
result = func(*args, **kwargs)
status_code = result[1]
if status_code == 200:
result = get_callback_result()
return result
return call
def get_callback_result():
callback_uri = request.get_json()['callback_uri']
has_answer = False
headers = {'content-type': 'application/json'}
empty_response = {}
content = json.dumps(empty_response)
try:
with Timeout(seconds=SYNC_REQUEST_TIMEOUT_SECONDS):
while not has_answer:
response = requests.get(callback_uri, headers=headers)
if response.status_code == 200:
has_answer = True
content = response.content
else:
time.sleep(0.2)
except TimeoutException:
log.debug('Timed out on sync request for request %s ' % request)
return content, 200
因此,如果我对您的理解正确,您希望后端等待某个工作人员的响应(通过 RabbitMQ)。您可以通过实施 rpc over rabbitmq 来实现。关键思想是使用关联 ID。
但绝对最有效的方法是 运行 客户端通过 websockets(如果不是浏览器,则为原始 tcp 套接字)并在工作完成时直接通知他。这样您就不会锁定资源(客户端连接、rabbitmq 队列)并且避免性能下降 (rpc)。
我会先解释一下我的系统架构,然后转到问题:
我有一个 REST API 用作我的 API 网关。该服务器是使用 Flask 构建的。我还有 RabbitMQ 集群,以及我编写的客户端,它监听特定队列并执行其获取的任务。
到目前为止,我的所有请求都是异步的,所以一旦请求到达 API 网关,callback_uri
字段 URL 到 POST结果作为请求的一部分提供,API 网关只负责将任务发送到 RabbitMQ,工作人员处理任务,最后 POST 将结果返回回调 URL.
我的问题是:
我希望新端点在某种意义上是同步的,即处理仍将由与以前相同的工作人员完成,但我想将结果返回到 API 网关 return 给用户并释放连接。
我目前的解决方案:
我像以前一样将唯一的 callback_uri
作为请求的一部分发送给工作人员,但现在特定端点由我的 API 网关实现并允许 POST和 GET 方法,因此工作人员可以在完成后 POST 结果,并且我的 API 网关不断轮询回调 URL 直到结果可用,然后 return 结果给客户。
除了让忙碌等待的 HTTP 工作者轮询自己的端点以获取结果之外,还有其他首选选项吗?但仍然是同步的,因此只有在结果可用时才释放连接?
代码仅供说明:
@app.route('/long_task', methods=['POST'])
@sync_request
def long_task():
try:
if request.get_json() is None:
return ERROR_MSG_NO_JSON, 400
create_and_send_request_to_rabbitmq()
return '', 200
except Exception as ex:
return ERROR_MSG_NO_DATA, 400
def sync_request(func):
def call(*args, **kwargs):
create_callback_uri()
result = func(*args, **kwargs)
status_code = result[1]
if status_code == 200:
result = get_callback_result()
return result
return call
def get_callback_result():
callback_uri = request.get_json()['callback_uri']
has_answer = False
headers = {'content-type': 'application/json'}
empty_response = {}
content = json.dumps(empty_response)
try:
with Timeout(seconds=SYNC_REQUEST_TIMEOUT_SECONDS):
while not has_answer:
response = requests.get(callback_uri, headers=headers)
if response.status_code == 200:
has_answer = True
content = response.content
else:
time.sleep(0.2)
except TimeoutException:
log.debug('Timed out on sync request for request %s ' % request)
return content, 200
因此,如果我对您的理解正确,您希望后端等待某个工作人员的响应(通过 RabbitMQ)。您可以通过实施 rpc over rabbitmq 来实现。关键思想是使用关联 ID。
但绝对最有效的方法是 运行 客户端通过 websockets(如果不是浏览器,则为原始 tcp 套接字)并在工作完成时直接通知他。这样您就不会锁定资源(客户端连接、rabbitmq 队列)并且避免性能下降 (rpc)。