没有背景对象时,如何使用 Starlette 的后台任务?
How to use background tasks with Starlette when there's no background object?
我希望目前避免使用任何 Celery。在 Starlette 的文档中,他们提供了两种添加后台任务的方法:
通过石墨烯:https://www.starlette.io/graphql/
class Query(graphene.ObjectType):
user_agent = graphene.String()
def resolve_user_agent(self, info):
"""
Return the User-Agent of the incoming request.
"""
user_agent = request.headers.get("User-Agent", "<unknown>")
background = info.context["background"]
background.add_task(log_user_agent, user_agent=user_agent)
return user_agent
通过 JSON 响应:https://www.starlette.io/background/
async def signup(request):
data = await request.json()
username = data['username']
email = data['email']
task = BackgroundTask(send_welcome_email, to_address=email)
message = {'status': 'Signup successful'}
return JSONResponse(message, background=task)
有谁知道用 Ariadne 向 Starlette 的后台添加任务的方法吗?我无法在我的解析器中 return JSON 响应,并且我无法访问 info.context["background"]。我唯一附加到我的上下文的是我的请求对象。
已解决!
Starlette 中间件:
class BackgroundTaskMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
request.state.background = None
response = await call_next(request)
if request.state.background:
response.background = request.state.background
return response
阿里阿德涅解析器:
@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
task = BackgroundTasks()
task.add_task(test_func)
task.add_task(testing_func_two, "I work now")
request = info.context["request"]
request.state.background = task
return True
async def test_func():
await asyncio.sleep(10)
print("once!!")
async def testing_func_two(message: str):
print(message)
函数仍然同步执行,但因为它们是后台任务,所以我不太担心。
上面标记为解决方案的内容对我不起作用,因为当您使用子类 BaseHTTPMiddleware 的中间件时 BackgroundTask 无法正常工作,请参见此处:
https://github.com/encode/starlette/issues/919
在我的情况下,基本上任务不在后台 运行,等待完成,我也没有使用 Ariadne,但这应该让你完成工作 运行后台任务
编辑:
这对我有用。
executor = ProcessPoolExecutor()
main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)
executor.shutdown()
logger.info("Process Pool Shutdown")
我希望目前避免使用任何 Celery。在 Starlette 的文档中,他们提供了两种添加后台任务的方法:
通过石墨烯:https://www.starlette.io/graphql/
class Query(graphene.ObjectType):
user_agent = graphene.String()
def resolve_user_agent(self, info):
"""
Return the User-Agent of the incoming request.
"""
user_agent = request.headers.get("User-Agent", "<unknown>")
background = info.context["background"]
background.add_task(log_user_agent, user_agent=user_agent)
return user_agent
通过 JSON 响应:https://www.starlette.io/background/
async def signup(request):
data = await request.json()
username = data['username']
email = data['email']
task = BackgroundTask(send_welcome_email, to_address=email)
message = {'status': 'Signup successful'}
return JSONResponse(message, background=task)
有谁知道用 Ariadne 向 Starlette 的后台添加任务的方法吗?我无法在我的解析器中 return JSON 响应,并且我无法访问 info.context["background"]。我唯一附加到我的上下文的是我的请求对象。
已解决!
Starlette 中间件:
class BackgroundTaskMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
request.state.background = None
response = await call_next(request)
if request.state.background:
response.background = request.state.background
return response
阿里阿德涅解析器:
@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
task = BackgroundTasks()
task.add_task(test_func)
task.add_task(testing_func_two, "I work now")
request = info.context["request"]
request.state.background = task
return True
async def test_func():
await asyncio.sleep(10)
print("once!!")
async def testing_func_two(message: str):
print(message)
函数仍然同步执行,但因为它们是后台任务,所以我不太担心。
上面标记为解决方案的内容对我不起作用,因为当您使用子类 BaseHTTPMiddleware 的中间件时 BackgroundTask 无法正常工作,请参见此处:
https://github.com/encode/starlette/issues/919
在我的情况下,基本上任务不在后台 运行,等待完成,我也没有使用 Ariadne,但这应该让你完成工作 运行后台任务
编辑: 这对我有用。
executor = ProcessPoolExecutor()
main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)
executor.shutdown()
logger.info("Process Pool Shutdown")