Celery 任务无法通过 channel_layer.send 调用 Channels 消费者
Celery task can not call Channels consumer via channel_layer.send
我一直在为一个相当简单的应用程序设置一个 websocket。我写了一个 JsonWebsocketConsumer 以及一些 celery 任务,以便执行一些更长的 运行 任务(数据库访问)。一切正常,但我的消费者从未接到电话。有什么想法吗?
consumers.py
class NetworkCompConsumer(JsonWebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive_json(self, content):
include = content['include']
# Reduce Dimensions of Inputs to 2, Just in Case User uploads 3D
# Geojson
wkb_w = WKBWriter()
wkb_w.outdim = 2
include = GEOSGeometry(json.dumps(include))
include = GEOSGeometry(wkb_w.write_hex(include))
print("firing tasks")
genPolySize.delay(include.json, None, self.channel_name)
genBuildingCount.delay(include.json, None, self.channel_name)
# This gets sent
self.send_json({
"hello world": "Hi!!!"
})
def polygon_area(self, event):
print("poly area consumer called!!")
# This does not get sent
self.send_json(
{"area": event['value']}
)
def building_count(self, event):
print("building count consumer called!!")
# This does not get sent
self.send_json(
{"buildingCount": event['value']}
)
tasks.py
def sync_send(channelName, consumer, value):
channel_layer = get_channel_layer()
print(channel_layer)
async_to_sync(channel_layer.send)(
channelName,
{
"type": consumer,
"value": value,
}
)
@shared_task
def genBuildingCount(include, exclude, channelName):
query_skeleton = building_count_skeleton
query_skeleton = getQueryTemplate(query_skeleton, exclude is not None, False)
with connections['gis_data'].cursor() as cursor:
cursor.execute(
query_skeleton, [include, exclude] if exclude is not None else [include])
results = cursor.fetchone()
buildingCount = results[0]
sync_send(channelName, "building.count", buildingCount)
print("successfully completed genBuildingCount")
@shared_task
def genPolySize(include, exclude, channelName):
query_skeleton = poly_size_skeleton
if exclude is not None:
query_skeleton = poly_size_skeleton_exclude
with connections['gis_data'].cursor() as cursor:
query_arguments = [
include,
exclude
] if exclude is not None else [include]
cursor.execute(query_skeleton, query_arguments)
results = cursor.fetchone()
polygonArea = squaredMetersToMiles(results[0])
sync_send(channelName, "polygon.area", polygonArea)
print("successfully completed genPolySize")
日志
django-app_1 | WebSocket HANDSHAKING /ws/network-comparison/ [172.18.0.1:60460]
django-app_1 | WebSocket CONNECT /ws/network-comparison/ [172.18.0.1:60460]
celery_1 | [2020-11-10 18:00:31,916: INFO/MainProcess] Received task: NetworkComparison.tasks.genPolySize[7867638b-76db-4385-b6f6-451970b2f7f3]
celery_1 | [2020-11-10 18:00:31,921: INFO/MainProcess] Received task: NetworkComparison.tasks.genBuildingCount[875826c0-4c73-4597-9a61-5b6240a4bcfa]
celery_1 | [2020-11-10 18:00:31,949: WARNING/ForkPoolWorker-3] <channels.layers.InMemoryChannelLayer object at 0x7f0a002c59a0>
celery_1 | [2020-11-10 18:00:31,952: WARNING/ForkPoolWorker-3] successfully completed genPolySize
celery_1 | [2020-11-10 18:00:31,955: INFO/ForkPoolWorker-3] Task NetworkComparison.tasks.genPolySize[7867638b-76db-4385-b6f6-451970b2f7f3] succeeded in 0.03706242493353784s: None
celery_1 | [2020-11-10 18:00:32,050: WARNING/ForkPoolWorker-5] <channels.layers.InMemoryChannelLayer object at 0x7f0a002c9400>
celery_1 | [2020-11-10 18:00:32,053: WARNING/ForkPoolWorker-5] successfully completed genBuildingCount
celery_1 | [2020-11-10 18:00:32,056: INFO/ForkPoolWorker-5] Task NetworkComparison.tasks.genBuildingCount[875826c0-4c73-4597-9a61-5b6240a4bcfa] succeeded in 0.13415803597308695s: None
celery 任务已完成,但似乎从未调用 polygon_area
和 building_count
。我是否应该担心 2 个印刷通道层位于不同的内存位置?
谢谢!
我的问题是我使用的是内存通道层而不是 Redis:https://channels.readthedocs.io/en/stable/topics/channel_layers.html?highlight=get_channel_layer#in-memory-channel-layer
切换到 Redis 通道层解决了我的问题:https://channels.readthedocs.io/en/stable/topics/channel_layers.html?highlight=get_channel_layer#redis-channel-layer
Celery Worker 和 Django Worker 在不同的 docker 容器中,所以我猜内存中的通道层永远不会适用于这种情况。
我一直在为一个相当简单的应用程序设置一个 websocket。我写了一个 JsonWebsocketConsumer 以及一些 celery 任务,以便执行一些更长的 运行 任务(数据库访问)。一切正常,但我的消费者从未接到电话。有什么想法吗?
consumers.py
class NetworkCompConsumer(JsonWebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive_json(self, content):
include = content['include']
# Reduce Dimensions of Inputs to 2, Just in Case User uploads 3D
# Geojson
wkb_w = WKBWriter()
wkb_w.outdim = 2
include = GEOSGeometry(json.dumps(include))
include = GEOSGeometry(wkb_w.write_hex(include))
print("firing tasks")
genPolySize.delay(include.json, None, self.channel_name)
genBuildingCount.delay(include.json, None, self.channel_name)
# This gets sent
self.send_json({
"hello world": "Hi!!!"
})
def polygon_area(self, event):
print("poly area consumer called!!")
# This does not get sent
self.send_json(
{"area": event['value']}
)
def building_count(self, event):
print("building count consumer called!!")
# This does not get sent
self.send_json(
{"buildingCount": event['value']}
)
tasks.py
def sync_send(channelName, consumer, value):
channel_layer = get_channel_layer()
print(channel_layer)
async_to_sync(channel_layer.send)(
channelName,
{
"type": consumer,
"value": value,
}
)
@shared_task
def genBuildingCount(include, exclude, channelName):
query_skeleton = building_count_skeleton
query_skeleton = getQueryTemplate(query_skeleton, exclude is not None, False)
with connections['gis_data'].cursor() as cursor:
cursor.execute(
query_skeleton, [include, exclude] if exclude is not None else [include])
results = cursor.fetchone()
buildingCount = results[0]
sync_send(channelName, "building.count", buildingCount)
print("successfully completed genBuildingCount")
@shared_task
def genPolySize(include, exclude, channelName):
query_skeleton = poly_size_skeleton
if exclude is not None:
query_skeleton = poly_size_skeleton_exclude
with connections['gis_data'].cursor() as cursor:
query_arguments = [
include,
exclude
] if exclude is not None else [include]
cursor.execute(query_skeleton, query_arguments)
results = cursor.fetchone()
polygonArea = squaredMetersToMiles(results[0])
sync_send(channelName, "polygon.area", polygonArea)
print("successfully completed genPolySize")
日志
django-app_1 | WebSocket HANDSHAKING /ws/network-comparison/ [172.18.0.1:60460]
django-app_1 | WebSocket CONNECT /ws/network-comparison/ [172.18.0.1:60460]
celery_1 | [2020-11-10 18:00:31,916: INFO/MainProcess] Received task: NetworkComparison.tasks.genPolySize[7867638b-76db-4385-b6f6-451970b2f7f3]
celery_1 | [2020-11-10 18:00:31,921: INFO/MainProcess] Received task: NetworkComparison.tasks.genBuildingCount[875826c0-4c73-4597-9a61-5b6240a4bcfa]
celery_1 | [2020-11-10 18:00:31,949: WARNING/ForkPoolWorker-3] <channels.layers.InMemoryChannelLayer object at 0x7f0a002c59a0>
celery_1 | [2020-11-10 18:00:31,952: WARNING/ForkPoolWorker-3] successfully completed genPolySize
celery_1 | [2020-11-10 18:00:31,955: INFO/ForkPoolWorker-3] Task NetworkComparison.tasks.genPolySize[7867638b-76db-4385-b6f6-451970b2f7f3] succeeded in 0.03706242493353784s: None
celery_1 | [2020-11-10 18:00:32,050: WARNING/ForkPoolWorker-5] <channels.layers.InMemoryChannelLayer object at 0x7f0a002c9400>
celery_1 | [2020-11-10 18:00:32,053: WARNING/ForkPoolWorker-5] successfully completed genBuildingCount
celery_1 | [2020-11-10 18:00:32,056: INFO/ForkPoolWorker-5] Task NetworkComparison.tasks.genBuildingCount[875826c0-4c73-4597-9a61-5b6240a4bcfa] succeeded in 0.13415803597308695s: None
celery 任务已完成,但似乎从未调用 polygon_area
和 building_count
。我是否应该担心 2 个印刷通道层位于不同的内存位置?
谢谢!
我的问题是我使用的是内存通道层而不是 Redis:https://channels.readthedocs.io/en/stable/topics/channel_layers.html?highlight=get_channel_layer#in-memory-channel-layer
切换到 Redis 通道层解决了我的问题:https://channels.readthedocs.io/en/stable/topics/channel_layers.html?highlight=get_channel_layer#redis-channel-layer
Celery Worker 和 Django Worker 在不同的 docker 容器中,所以我猜内存中的通道层永远不会适用于这种情况。