django 通道竞争条件
django channels race condition
我正在使用 Django 通道来处理 websocket 连接。如果两个客户端同时发送两条消息,我会使用通道的 WebsocketConsumer
获得竞争条件。我假设发生这种情况是因为与该消费者的每个连接都启动了自己的线程,然后与其他线程并行处理。所以我想我会切换到 AsyncWebsocketConsumer
。我把 async
、await
、database_sync_to_async
等放在必要的地方,一切正常,但竞争条件问题仍然存在。我想通过使用 AsyncWebsocketConsumer
,每个连接都将在同一个线程中处理,而调用 async def receive(...)
会阻塞线程,以便按顺序处理每个接收到的消息。我做错了什么?
Channels 将为您的消费者创建一个新实例,并为该实例创建一个相应的 运行-loop。当您 await database_sync_to_async
在单个线程上 运行ning 时,该消费者的 运行 循环将让出让另一个消费者做一些工作..
如果你想在连接之间进行同步,你应该通过数据库或使用通道层来做到这一点,但你应该假设消息可以随时到达(就像你有一个常规的 HTTP 端点一样)。
我现在正在使用 pg 咨询锁,请参阅 https://github.com/Xof/django-pglocks。
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django_pglocks import advisory_lock
class Consumer(AsyncWebsocketConsumer):
async def receive(self, text_data=None, bytes_data=None):
text_data = json.loads(text_data)
await database_sync_to_async(self.do_stuff)(text_data)
def do_stuff(self, text_data):
with advisory_lock("a lock"):
stuff = Stuff.objects.get(pk=text_data["stuff_id"])
# do things
stuff.save()
编辑:因为我需要很多锁,而 postgresql 只允许大约一百个连接,所以我切换到 python-redis-lock。最大连接数现在仅受可用 ulimit 设置的文件描述符的限制。
我正在使用 Django 通道来处理 websocket 连接。如果两个客户端同时发送两条消息,我会使用通道的 WebsocketConsumer
获得竞争条件。我假设发生这种情况是因为与该消费者的每个连接都启动了自己的线程,然后与其他线程并行处理。所以我想我会切换到 AsyncWebsocketConsumer
。我把 async
、await
、database_sync_to_async
等放在必要的地方,一切正常,但竞争条件问题仍然存在。我想通过使用 AsyncWebsocketConsumer
,每个连接都将在同一个线程中处理,而调用 async def receive(...)
会阻塞线程,以便按顺序处理每个接收到的消息。我做错了什么?
Channels 将为您的消费者创建一个新实例,并为该实例创建一个相应的 运行-loop。当您 await database_sync_to_async
在单个线程上 运行ning 时,该消费者的 运行 循环将让出让另一个消费者做一些工作..
如果你想在连接之间进行同步,你应该通过数据库或使用通道层来做到这一点,但你应该假设消息可以随时到达(就像你有一个常规的 HTTP 端点一样)。
我现在正在使用 pg 咨询锁,请参阅 https://github.com/Xof/django-pglocks。
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django_pglocks import advisory_lock
class Consumer(AsyncWebsocketConsumer):
async def receive(self, text_data=None, bytes_data=None):
text_data = json.loads(text_data)
await database_sync_to_async(self.do_stuff)(text_data)
def do_stuff(self, text_data):
with advisory_lock("a lock"):
stuff = Stuff.objects.get(pk=text_data["stuff_id"])
# do things
stuff.save()
编辑:因为我需要很多锁,而 postgresql 只允许大约一百个连接,所以我切换到 python-redis-lock。最大连接数现在仅受可用 ulimit 设置的文件描述符的限制。