Django ORM 和异步

Django ORM and Async

所以,我正在尝试在 django 命令内部创建一个轮询系统,以供娱乐和学习 async/django。

我正在使用 django_tenants,虽然它不是特别重要。这个想法是有一个 table 持有“租户”。我想在更高的无限循环中循环遍历这些租户。对于每个尚未在异步队列中的租户(可能是因为上次找到该租户的过程尚未完成),我想添加一个新的异步任务 运行.

我的测试用例有一个租户,我等待 5 秒以确保内部 queue/loop 正常工作。

想法是,假设有 3 个租户:t_a、t_b 和 t_c,并且 t_a 和 t_c 取 2 和 1秒分别为 运行,t_b 占 5.

我最终会像这样看到无限循环 运行ning:

t_c 完成 t_c 完成 t_a完成 t_c 完成 t_c 完成 t_a完成 t_c 完成 t_b完成 t_c 完成 --- 长 运行ning 一个 t_a完成 ....

因此,t_c 不会阻止其他租户被重新运行。

租户的每次迭代都会获取新数据,因为租户可能会在 运行 秒之间创建。

class Command(BaseCommand):
    help = 'Processes the webhook queue'

    def __init__(self):
        self.tenant_queue = []
        super().__init__()

    def add_arguments(self, parser):
        parser.add_argument(
            '--schema',
            action='store_true',
            help='Process on a single schema',
        )

    async def get_tenant_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 't_b':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.tenant_queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.tenant_queue.pop(self.tenant_queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            return Tenant.objects.all()

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for t in tenants:
                    if t.id not in self.tenant_queue:
                        self.tenant_queue.append(t.id) 

                processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]

                await asyncio.gather(*processing_schemas)

        asyncio.run(_main_routine())

现在每次都会死

django.core.exceptions.SynchronousOnlyOperation:您不能从异步上下文中调用它 - 使用线程或 sync_to_async.

File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

我应该如何遍历租户?

啊啊啊想通了...为了防止对 QuerySet 的操作使 ORM 尝试和 运行 异步函数中的同步操作,我需要将 QuerySet 转换为 [= 中的列表19=]函数


    async def get_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 'company_xyz':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.queue.pop(self.queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            # THIS LINE WRAPPED IN LIST
            return list(Tenant.objects.all())

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for tenant in tenants:
                    if tenant.id not in self.queue:
                        self.queue.append(tenant.id)
                        asyncio.create_task(self.get_queue_count(tenant))

        asyncio.run(_main_routine())

然后为了无限期地 运行 和 non-blocking,我只是在循环中一个接一个地创建任务,并使用简单的整数检查来跟踪它们 self.queue

现在一切正常!希望这对某人有所帮助。