Django ORM 和异步
Django ORM and Async
所以,我正在尝试在 django 命令内部创建一个轮询系统,以供娱乐和学习 async/django。
我正在使用 django_tenants,虽然它不是特别重要。这个想法是有一个 table 持有“租户”。我想在更高的无限循环中循环遍历这些租户。对于每个尚未在异步队列中的租户(可能是因为上次找到该租户的过程尚未完成),我想添加一个新的异步任务 运行.
我的测试用例有一个租户,我等待 5 秒以确保内部 queue/loop 正常工作。
想法是,假设有 3 个租户:t_a、t_b 和 t_c,并且 t_a 和 t_c 取 2 和 1秒分别为 运行,t_b 占 5.
我最终会像这样看到无限循环 运行ning:
t_c 完成
t_c 完成
t_a完成
t_c 完成
t_c 完成
t_a完成
t_c 完成
t_b完成
t_c 完成 --- 长 运行ning 一个
t_a完成
....
因此,t_c 不会阻止其他租户被重新运行。
租户的每次迭代都会获取新数据,因为租户可能会在 运行 秒之间创建。
class Command(BaseCommand):
help = 'Processes the webhook queue'
def __init__(self):
self.tenant_queue = []
super().__init__()
def add_arguments(self, parser):
parser.add_argument(
'--schema',
action='store_true',
help='Process on a single schema',
)
async def get_tenant_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 't_b':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.tenant_queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.tenant_queue.pop(self.tenant_queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
return Tenant.objects.all()
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for t in tenants:
if t.id not in self.tenant_queue:
self.tenant_queue.append(t.id)
processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]
await asyncio.gather(*processing_schemas)
asyncio.run(_main_routine())
现在每次都会死
django.core.exceptions.SynchronousOnlyOperation:您不能从异步上下文中调用它 - 使用线程或 sync_to_async.
File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
我应该如何遍历租户?
啊啊啊想通了...为了防止对 QuerySet 的操作使 ORM 尝试和 运行 异步函数中的同步操作,我需要将 QuerySet 转换为 [= 中的列表19=]函数
async def get_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 'company_xyz':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.queue.pop(self.queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
# THIS LINE WRAPPED IN LIST
return list(Tenant.objects.all())
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for tenant in tenants:
if tenant.id not in self.queue:
self.queue.append(tenant.id)
asyncio.create_task(self.get_queue_count(tenant))
asyncio.run(_main_routine())
然后为了无限期地 运行 和 non-blocking,我只是在循环中一个接一个地创建任务,并使用简单的整数检查来跟踪它们 self.queue
现在一切正常!希望这对某人有所帮助。
所以,我正在尝试在 django 命令内部创建一个轮询系统,以供娱乐和学习 async/django。
我正在使用 django_tenants,虽然它不是特别重要。这个想法是有一个 table 持有“租户”。我想在更高的无限循环中循环遍历这些租户。对于每个尚未在异步队列中的租户(可能是因为上次找到该租户的过程尚未完成),我想添加一个新的异步任务 运行.
我的测试用例有一个租户,我等待 5 秒以确保内部 queue/loop 正常工作。
想法是,假设有 3 个租户:t_a、t_b 和 t_c,并且 t_a 和 t_c 取 2 和 1秒分别为 运行,t_b 占 5.
我最终会像这样看到无限循环 运行ning:
t_c 完成 t_c 完成 t_a完成 t_c 完成 t_c 完成 t_a完成 t_c 完成 t_b完成 t_c 完成 --- 长 运行ning 一个 t_a完成 ....
因此,t_c 不会阻止其他租户被重新运行。
租户的每次迭代都会获取新数据,因为租户可能会在 运行 秒之间创建。
class Command(BaseCommand):
help = 'Processes the webhook queue'
def __init__(self):
self.tenant_queue = []
super().__init__()
def add_arguments(self, parser):
parser.add_argument(
'--schema',
action='store_true',
help='Process on a single schema',
)
async def get_tenant_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 't_b':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.tenant_queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.tenant_queue.pop(self.tenant_queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
return Tenant.objects.all()
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for t in tenants:
if t.id not in self.tenant_queue:
self.tenant_queue.append(t.id)
processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]
await asyncio.gather(*processing_schemas)
asyncio.run(_main_routine())
现在每次都会死
django.core.exceptions.SynchronousOnlyOperation:您不能从异步上下文中调用它 - 使用线程或 sync_to_async.
File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
我应该如何遍历租户?
啊啊啊想通了...为了防止对 QuerySet 的操作使 ORM 尝试和 运行 异步函数中的同步操作,我需要将 QuerySet 转换为 [= 中的列表19=]函数
async def get_queue_count(self, tenant):
with tenant_context(tenant):
count = await sync_to_async(Queue.objects.count)()
if tenant.schema_name == 'company_xyz':
random_int = 10
else:
random_int = randint(1, 3)
await asyncio.sleep(random_int)
print(self.queue)
print(f'{tenant.name}: {count}, time: {random_int}')
self.queue.pop(self.queue.index(tenant.id))
def handle(self, *args, **options):
def _get_tenants():
# THIS LINE WRAPPED IN LIST
return list(Tenant.objects.all())
async def _main_routine():
while True:
tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
for tenant in tenants:
if tenant.id not in self.queue:
self.queue.append(tenant.id)
asyncio.create_task(self.get_queue_count(tenant))
asyncio.run(_main_routine())
然后为了无限期地 运行 和 non-blocking,我只是在循环中一个接一个地创建任务,并使用简单的整数检查来跟踪它们 self.queue
现在一切正常!希望这对某人有所帮助。