Python run_in_executor 忘了?

Python run_in_executor and forget?

如何在执行程序中将阻塞函数设置为 运行,结果无关紧要,因此主线程不应该等待或被它减慢。

老实说,我不确定这是否是正确的解决方案,我想要的只是将某种类型的处理队列与主进程分开,这样它就不会阻止服务器应用程序返回请求,因为这种类型的 Web 服务器 运行 一个工作人员处理许多请求。

我最好远离像 Celery 这样的解决方案,但如果那是最佳方案,我愿意学习它。

这里的上下文是一个异步网络服务器,它生成带有大图像的 pdf 文件。

app = Sanic()
#App "global" worker
executor = ProcessPoolExecutor(max_workers=5)

app.route('/')
async def getPdf(request):
  asyncio.create_task(renderPdfsInExecutor(request.json))
  #This should be returned "instantly" regardless of pdf generation time
  return response.text('Pdf being generated, it will be sent to your email when done')

async def renderPdfsInExecutor(json):
  asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

def syncRenderPdfs(json)
  #Some PDF Library that downloads images synchronously
  pdfs = somePdfLibrary.generatePdfsFromJson(json)
  sendToDefaultMail(pdfs)

以上代码报错(没错,是运行ning as admin):

PermissionError [WinError 5] Access denied
Future exception was never retrieved

额外问题:通过 运行在执行器中设置一个异步循环,我能得到什么吗?因此,如果它同时处理多个 PDF 请求,它将在它们之间分配处理。如果是,我该怎么做?

好吧,那么首先有一个误会。这个

async def getPdf(request):
    asyncio.create_task(renderPdfsInExecutor(request.json))
    ...

async def renderPdfsInExecutor(json):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

是多余的。够做

async def getPdf(request):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, request.json)
    ...

或者(因为你不想等待)甚至更好

async def getPdf(request):
    executor.submit(syncRenderPdfs, request.json)
    ...

现在你遇到的问题是因为 syncRenderPdfs 抛出 PermissionError。它没有被处理,所以 Python 警告你 "Hey, some background code threw an error. But the code is not owned by anyone so what the heck?"。这就是为什么你得到 Future exception was never retrievedpdf 库本身有问题,与 asyncio 无关。一旦你解决了这个内部问题,安全也是一个好主意:

def syncRenderPdfs(json)
    try:
        #Some PDF Library that downloads images synchronously
        pdfs = somePdfLibrary.generatePdfsFromJson(json)
        sendToDefaultMail(pdfs)
    except Exception:
        logger.exception('Something went wrong')  # or whatever

你的 "permission denied" 问题完全不同,你应该调试它 and/or post 一个单独的问题。

至于最后一个问题:是的,executor会在worker之间排队并平均分配任务。

编辑: 正如我们在评论中所说,实际问题可能出在您工作的 Windows 环境中。或者更准确地说,使用 ProcessPoolExecutor,即生成进程可能会更改权限。我建议使用 ThreadPoolExecutor,假设它在平台上运行良好。

您可以并行查看 asyncio.gather(*tasks) 到 运行 多个。

请记住,并行任务只有在 io 绑定且不阻塞时才能正常工作。

来自 python 文档 (https://docs.python.org/3/library/asyncio-task.html) 的示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())