没有背景对象时,如何使用 Starlette 的后台任务?

How to use background tasks with Starlette when there's no background object?

我希望目前避免使用任何 Celery。在 Starlette 的文档中,他们提供了两种添加后台任务的方法:

通过石墨烯:https://www.starlette.io/graphql/

class Query(graphene.ObjectType):
    user_agent = graphene.String()

    def resolve_user_agent(self, info):
        """
        Return the User-Agent of the incoming request.
        """
        user_agent = request.headers.get("User-Agent", "<unknown>")
        background = info.context["background"]
        background.add_task(log_user_agent, user_agent=user_agent)
        return user_agent

通过 JSON 响应:https://www.starlette.io/background/

async def signup(request):
    data = await request.json()
    username = data['username']
    email = data['email']
    task = BackgroundTask(send_welcome_email, to_address=email)
    message = {'status': 'Signup successful'}
    return JSONResponse(message, background=task)

有谁知道用 Ariadne 向 Starlette 的后台添加任务的方法吗?我无法在我的解析器中 return JSON 响应,并且我无法访问 info.context["background"]。我唯一附加到我的上下文的是我的请求对象。

已解决!

Starlette 中间件:

class BackgroundTaskMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        request.state.background = None
        response = await call_next(request)
        if request.state.background:
            response.background = request.state.background
        return response

阿里阿德涅解析器:

@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
    task = BackgroundTasks()
    task.add_task(test_func)
    task.add_task(testing_func_two, "I work now")
    request = info.context["request"]
    request.state.background = task
    return True


async def test_func():
    await asyncio.sleep(10)
    print("once!!")


async def testing_func_two(message: str):
    print(message)

函数仍然同步执行,但因为它们是后台任务,所以我不太担心。

More discussion here.

上面标记为解决方案的内容对我不起作用,因为当您使用子类 BaseHTTPMiddleware 的中间件时 BackgroundTask 无法正常工作,请参见此处:

https://github.com/encode/starlette/issues/919

在我的情况下,基本上任务不在后台 运行,等待完成,我也没有使用 Ariadne,但这应该让你完成工作 运行后台任务

编辑: 这对我有用。

executor = ProcessPoolExecutor()

main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)

executor.shutdown()
logger.info("Process Pool Shutdown")