Flask 中的线程任务队列
Threading Tasks Queue in Flask
大家好,我正在使用 flask 构建一个 websocket,我目前正在为每个请求创建一个新线程,这样线程就可以进行一些繁重的处理,而不需要花费太多时间来 return 来自要求。问题是在某些时候我打开了太多线程,它开始引起问题,我想知道我是否可以在烧瓶中设置一些队列来限制应用程序每次只能创建 8 个线程。
我的代码:
def process_audio(bucket_name, key, _id, extension):
S3_CLIENT = boto3.client('s3', region_name=S3_REGION)
print('Running audio proccessing')
INPUT_FILE = os.path.join(TEMP_PATH, f'{_id}.{extension}')
print(f'Saving downloaded file to {INPUT_FILE}')
S3_CLIENT.download_file(bucket_name, key, INPUT_FILE)
print('File downloaded')
process = stt.process_audio(INPUT_FILE)
print(f'Audio processed by AI returned: "{process}"')
stt.reset()
ai = get_sentimentAI_results(process)
if ai:
print(f'Text processed by AI returned class {ai[0]} with a certainty of {ai[1]}%')
return True
print('Request to sentiment AI endpoint failed for an unkown reason. Check CloudWhatch for more information!')
return False
@app.route('/process/audio', methods=['POST'])
def process_new_audio():
print('Recieving new request')
data = request.data
if not data:
return '', 404
data = json.loads(data)
bucket_name = data.get('bucket_name')
key = data.get('key')
_id = data.get('id')
extension = data.get('file_extension')
if not key or not bucket_name or not _id or not extension:
return '', 404
thread = Thread(target=process_audio, kwargs={'bucket_name': bucket_name, 'key': key, '_id': _id, 'extension': extension})
thread.start()
return '', 200
问题恢复:
这个 flask 函数用作由 AWS lambda 触发的 webhook,它创建一个线程来处理数据,而不需要让 lambda 等待它完成。我只需要一种创建队列的方法,以防发出 100 个请求我没有 100 个线程 运行 但每次只有 5 个线程
这是多线程时一个很常见的问题,这里需要的解决方案称为生产者-消费者模型,其中有一个生产者(创建工作的实体)然后有一个(线程安全)队列,其中这个工作被推入,然后有消费者(工作线程)从队列中一个接一个地弹出工作,直到队列为空。
这样做会限制工作线程的数量。
一种巧妙的方法是使用 python 中可用的 concurrent.futures 库。 @aaron 给出了一个合适的 link。
我处理过一个类似的服务,我们通过向我们的 flask 服务器添加一个 Celery 应用程序来解决任务排队问题。获取请求的线程,生成一个 Celery 任务,并在 Celery 应用程序有足够资源时将其排队等待应用程序处理。
我们遵循了本教程:
https://flask.palletsprojects.com/en/2.0.x/patterns/celery/
如果您在执行此操作时遇到任何问题,请随时询问您的进度。
大家好,我正在使用 flask 构建一个 websocket,我目前正在为每个请求创建一个新线程,这样线程就可以进行一些繁重的处理,而不需要花费太多时间来 return 来自要求。问题是在某些时候我打开了太多线程,它开始引起问题,我想知道我是否可以在烧瓶中设置一些队列来限制应用程序每次只能创建 8 个线程。
我的代码:
def process_audio(bucket_name, key, _id, extension):
S3_CLIENT = boto3.client('s3', region_name=S3_REGION)
print('Running audio proccessing')
INPUT_FILE = os.path.join(TEMP_PATH, f'{_id}.{extension}')
print(f'Saving downloaded file to {INPUT_FILE}')
S3_CLIENT.download_file(bucket_name, key, INPUT_FILE)
print('File downloaded')
process = stt.process_audio(INPUT_FILE)
print(f'Audio processed by AI returned: "{process}"')
stt.reset()
ai = get_sentimentAI_results(process)
if ai:
print(f'Text processed by AI returned class {ai[0]} with a certainty of {ai[1]}%')
return True
print('Request to sentiment AI endpoint failed for an unkown reason. Check CloudWhatch for more information!')
return False
@app.route('/process/audio', methods=['POST'])
def process_new_audio():
print('Recieving new request')
data = request.data
if not data:
return '', 404
data = json.loads(data)
bucket_name = data.get('bucket_name')
key = data.get('key')
_id = data.get('id')
extension = data.get('file_extension')
if not key or not bucket_name or not _id or not extension:
return '', 404
thread = Thread(target=process_audio, kwargs={'bucket_name': bucket_name, 'key': key, '_id': _id, 'extension': extension})
thread.start()
return '', 200
问题恢复:
这个 flask 函数用作由 AWS lambda 触发的 webhook,它创建一个线程来处理数据,而不需要让 lambda 等待它完成。我只需要一种创建队列的方法,以防发出 100 个请求我没有 100 个线程 运行 但每次只有 5 个线程
这是多线程时一个很常见的问题,这里需要的解决方案称为生产者-消费者模型,其中有一个生产者(创建工作的实体)然后有一个(线程安全)队列,其中这个工作被推入,然后有消费者(工作线程)从队列中一个接一个地弹出工作,直到队列为空。
这样做会限制工作线程的数量。 一种巧妙的方法是使用 python 中可用的 concurrent.futures 库。 @aaron 给出了一个合适的 link。
我处理过一个类似的服务,我们通过向我们的 flask 服务器添加一个 Celery 应用程序来解决任务排队问题。获取请求的线程,生成一个 Celery 任务,并在 Celery 应用程序有足够资源时将其排队等待应用程序处理。 我们遵循了本教程:
https://flask.palletsprojects.com/en/2.0.x/patterns/celery/
如果您在执行此操作时遇到任何问题,请随时询问您的进度。