将数据生产者作为工作者集成到 Django Channels 2.x
Integrating a data producer as worker to Django Channels 2.x
我正在开发一个应用程序,其中推送给客户端的实时数据将来自外部 API。它的一个简单版本可以被认为是一个外汇货币追踪器。用户将指定她想要跟踪的货币(美元、欧元、英镑等)并接收实时更新。货币数据将通过长轮询来自外部 API。我的问题是如何将这个数据生产者整合到渠道中?
在所有通道示例中,我发现工作人员的工作是由事件触发的,但在我的例子中,它将从头开始,连续工作,而不是接收事件,它只会将新值推送到通道层,以便通知订阅者.所以我不确定消费者模式是否正确。总结一下我的问题:
我应该为这个任务使用消费者吗?如何设置它?考虑到 API 将被长轮询异步或同步消费者访问?在其连接方法中开始轮询外部 API 还是为此发送一次性事件?从何时何地发送此 "start working" 事件?
我也想用redis存储值,为用户提供货币初始值。他们将开始侦听连接上的更新,但更新可能会在几秒钟后出现。我可以访问通道层使用的 redis 连接实例吗?或者我是否需要为此打开另一个到我的 redis 的连接?
数据生产者的另一个选择可以将其完全保留在 Django 通道之外,如 here 所述,并将数据推送到通道层,但我不确定在部署期间达芙妮可能会出现问题。我的意思是我怎样才能确保它保持正常运行并与频道很好地共享资源?
谢谢。
Worker 适合您的用例。它们应该很长 运行 并且每个请求都没有一个新实例。如果你想让你的消费者异步,你必须确保你做的任何事情都不会阻塞。所有数据库查询都必须包含在 database_sync_to_async 中,即使数据库调用发生在调用堆栈的 5 层以下。您可以使用 Django 缓存 API 连接到 Redis,但最好在它之外工作以保持所有内容异步。直接使用 redis 库通道,因为它具有将 redis 用作缓存的异步方法。
(为了回答 Nasir 的评论和后来的访问者,这是我的完整设置)
Channels 及其工作人员确实是我项目的不错选择,而且我有一些工作方式很好。它尚未投入生产,但工作正常,代码结构良好,易于使用等。
首先我们需要设置一个 worker 并让它工作。假设我们的工作人员 class 是 ExternalData,我们将为工作人员设置一个特定的频道:
# routing.py
application = ProtocolTypeRouter({
# ...
'channel': ChannelNameRouter({
"external-data": ExternalData,
})
})
# asgi.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })
# external_data.py worker's code
# used as a singleton object
class DataStore(object):
@classmethod
async def create(cls, owner):
self = DataStore()
self.currencies = {}
self.owner = owner
# ...
return self
class ExternalData(AsyncConsumer):
started = False
# triggered from asgi.py via daphne start
async def external_data_start(self, event):
if ExternalData.started:
if settings.DEBUG:
raise RuntimeError("ExternalData already working.")
else:
logger.warning("ExternalData already working.")
return
else:
# do your initialization work here and let the data producer start listening and saving external data
ExternalData.started = True
self.store = await DataStore.create(owner=self)
当然,上面代码中的 DataStore 不是必需的,但如果您要做一些复杂的事情,最好将 ExternalData 用于与通道相关的事情,而在另一个 class 中做其他事情。使用此设置,您需要首先 运行 工人:
python manage.py runworker external-data
然后启动 daphne(即在另一个终端中查看它们的输出):
daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application
在生产中,当你需要编写服务或类似的 daphne 时,应该稍后启动(例如休眠 2-3 秒)以确保 worker 文件由 python 和 运行宁。您也可以重复尝试 asgi.py 代码(即在循环中进行一些睡眠)直到工作人员设置了一些环境标志。
现在我们的数据提供者已启动,但客户端呢?我们需要有一个消费者,它主要充当我们的数据提供者和客户之间的中介。对于我的项目,数据传输要求涵盖了大多数情况:
- A:当客户端连接时发送一些初始数据
- B:客户端可以访问一个页面,需要获取一些与该页面相关的额外数据
- C:客户端在一个页面,你需要发送实时数据并更新页面
- D: 有新数据到达,需要通知客户端
我们的是单页应用程序,这就是我们需要所有这些的原因。这是包含我如何处理所有这些情况的片段:
# consumer.py
class FeedsConsumer(AsyncJsonWebsocketConsumer):
groups = ["broadcast"] # for requirement D
# triggered from client
async def connect(self):
await self.accept()
self.listening = set() # for requirement C
logger.info(f"New client connected: {self.channel_name}")
# for requirement A
await self.channel_layer.send("external-data",
{ "type": "external.new_client", 'client_channel': self.channel_name })
# triggered from client
async def receive_json(self, data):
# for requirement B
if data["type"] == "get_currency":
payload["type"] = "external.send_currency"
payload["client_channel"] = self.channel_name
payload["currency"] = data["currency"]
self.listen(data["currency"]) # for requirement C
await self.channel_layer.send("external-data", payload)
# for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
async def listen(self, item_id):
if item_id not in self.listening:
await self.channel_layer.group_add(item_id, self.channel_name )
self.listening.add(item_id)
# below are triggered from the worker. A and B as responses. C and D as server generated messages
# for requirement A
async def init_data(self, payload):
await self.send_json(payload)
# for requirement B
async def send_currency(self, payload):
await self.send_json(payload)
# for requirement C
async def new_value(self, payload):
await self.send_json(payload)
# for requirement D
async def new_currency(self, payload):
await self.send_json(payload)
# external_data.py worker's code
class ExternalData(AsyncConsumer):
# for requirement A. triggered from consumer.
async def external_new_client(self, payload):
data_to_send = list(self.store.currencies.keys())
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # new client
{ 'type': 'init_data',
'data': data_to_send,
})
# for requirement B. triggered from consumer.
async def external_send_currency(self, payload):
data_to_send = self.store.currencies[payload["currency"]]
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # only the client who requested data
{ 'type': 'send_currency',
'data': data_to_send,
})
async def new_data_arrived(self, currency, value):
if currency not in self.store.currencies:
self.store.currencies[currency] = value
# requirement D. suppose this is new data so we need to notify all connected users of its availability
await self.channel_layer.group_send("broadcast", # all clients are in this group
{ 'type': 'new_currency',
'data': currency,
})
else:
# requirement C, notify listeners.
self.store.currencies[currency] = value
await self.channel_layer.group_send(currency, # all clients listening to this currency
{ 'type': 'new_value',
'currency': currency,
'value': value,
})
希望我没有弄乱代码,也不太复杂(我懒得 paste/edit 为每个需求单独编写代码)。有什么问题请在评论中提出。
我正在开发一个应用程序,其中推送给客户端的实时数据将来自外部 API。它的一个简单版本可以被认为是一个外汇货币追踪器。用户将指定她想要跟踪的货币(美元、欧元、英镑等)并接收实时更新。货币数据将通过长轮询来自外部 API。我的问题是如何将这个数据生产者整合到渠道中?
在所有通道示例中,我发现工作人员的工作是由事件触发的,但在我的例子中,它将从头开始,连续工作,而不是接收事件,它只会将新值推送到通道层,以便通知订阅者.所以我不确定消费者模式是否正确。总结一下我的问题:
我应该为这个任务使用消费者吗?如何设置它?考虑到 API 将被长轮询异步或同步消费者访问?在其连接方法中开始轮询外部 API 还是为此发送一次性事件?从何时何地发送此 "start working" 事件?
我也想用redis存储值,为用户提供货币初始值。他们将开始侦听连接上的更新,但更新可能会在几秒钟后出现。我可以访问通道层使用的 redis 连接实例吗?或者我是否需要为此打开另一个到我的 redis 的连接?
数据生产者的另一个选择可以将其完全保留在 Django 通道之外,如 here 所述,并将数据推送到通道层,但我不确定在部署期间达芙妮可能会出现问题。我的意思是我怎样才能确保它保持正常运行并与频道很好地共享资源?
谢谢。
Worker 适合您的用例。它们应该很长 运行 并且每个请求都没有一个新实例。如果你想让你的消费者异步,你必须确保你做的任何事情都不会阻塞。所有数据库查询都必须包含在 database_sync_to_async 中,即使数据库调用发生在调用堆栈的 5 层以下。您可以使用 Django 缓存 API 连接到 Redis,但最好在它之外工作以保持所有内容异步。直接使用 redis 库通道,因为它具有将 redis 用作缓存的异步方法。
(为了回答 Nasir 的评论和后来的访问者,这是我的完整设置)
Channels 及其工作人员确实是我项目的不错选择,而且我有一些工作方式很好。它尚未投入生产,但工作正常,代码结构良好,易于使用等。
首先我们需要设置一个 worker 并让它工作。假设我们的工作人员 class 是 ExternalData,我们将为工作人员设置一个特定的频道:
# routing.py
application = ProtocolTypeRouter({
# ...
'channel': ChannelNameRouter({
"external-data": ExternalData,
})
})
# asgi.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })
# external_data.py worker's code
# used as a singleton object
class DataStore(object):
@classmethod
async def create(cls, owner):
self = DataStore()
self.currencies = {}
self.owner = owner
# ...
return self
class ExternalData(AsyncConsumer):
started = False
# triggered from asgi.py via daphne start
async def external_data_start(self, event):
if ExternalData.started:
if settings.DEBUG:
raise RuntimeError("ExternalData already working.")
else:
logger.warning("ExternalData already working.")
return
else:
# do your initialization work here and let the data producer start listening and saving external data
ExternalData.started = True
self.store = await DataStore.create(owner=self)
当然,上面代码中的 DataStore 不是必需的,但如果您要做一些复杂的事情,最好将 ExternalData 用于与通道相关的事情,而在另一个 class 中做其他事情。使用此设置,您需要首先 运行 工人:
python manage.py runworker external-data
然后启动 daphne(即在另一个终端中查看它们的输出):
daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application
在生产中,当你需要编写服务或类似的 daphne 时,应该稍后启动(例如休眠 2-3 秒)以确保 worker 文件由 python 和 运行宁。您也可以重复尝试 asgi.py 代码(即在循环中进行一些睡眠)直到工作人员设置了一些环境标志。
现在我们的数据提供者已启动,但客户端呢?我们需要有一个消费者,它主要充当我们的数据提供者和客户之间的中介。对于我的项目,数据传输要求涵盖了大多数情况:
- A:当客户端连接时发送一些初始数据
- B:客户端可以访问一个页面,需要获取一些与该页面相关的额外数据
- C:客户端在一个页面,你需要发送实时数据并更新页面
- D: 有新数据到达,需要通知客户端
我们的是单页应用程序,这就是我们需要所有这些的原因。这是包含我如何处理所有这些情况的片段:
# consumer.py
class FeedsConsumer(AsyncJsonWebsocketConsumer):
groups = ["broadcast"] # for requirement D
# triggered from client
async def connect(self):
await self.accept()
self.listening = set() # for requirement C
logger.info(f"New client connected: {self.channel_name}")
# for requirement A
await self.channel_layer.send("external-data",
{ "type": "external.new_client", 'client_channel': self.channel_name })
# triggered from client
async def receive_json(self, data):
# for requirement B
if data["type"] == "get_currency":
payload["type"] = "external.send_currency"
payload["client_channel"] = self.channel_name
payload["currency"] = data["currency"]
self.listen(data["currency"]) # for requirement C
await self.channel_layer.send("external-data", payload)
# for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
async def listen(self, item_id):
if item_id not in self.listening:
await self.channel_layer.group_add(item_id, self.channel_name )
self.listening.add(item_id)
# below are triggered from the worker. A and B as responses. C and D as server generated messages
# for requirement A
async def init_data(self, payload):
await self.send_json(payload)
# for requirement B
async def send_currency(self, payload):
await self.send_json(payload)
# for requirement C
async def new_value(self, payload):
await self.send_json(payload)
# for requirement D
async def new_currency(self, payload):
await self.send_json(payload)
# external_data.py worker's code
class ExternalData(AsyncConsumer):
# for requirement A. triggered from consumer.
async def external_new_client(self, payload):
data_to_send = list(self.store.currencies.keys())
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # new client
{ 'type': 'init_data',
'data': data_to_send,
})
# for requirement B. triggered from consumer.
async def external_send_currency(self, payload):
data_to_send = self.store.currencies[payload["currency"]]
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # only the client who requested data
{ 'type': 'send_currency',
'data': data_to_send,
})
async def new_data_arrived(self, currency, value):
if currency not in self.store.currencies:
self.store.currencies[currency] = value
# requirement D. suppose this is new data so we need to notify all connected users of its availability
await self.channel_layer.group_send("broadcast", # all clients are in this group
{ 'type': 'new_currency',
'data': currency,
})
else:
# requirement C, notify listeners.
self.store.currencies[currency] = value
await self.channel_layer.group_send(currency, # all clients listening to this currency
{ 'type': 'new_value',
'currency': currency,
'value': value,
})
希望我没有弄乱代码,也不太复杂(我懒得 paste/edit 为每个需求单独编写代码)。有什么问题请在评论中提出。