连续 运行 Django 中的 Celery 任务
Continously running Celery Task in Django
我有一个 Django 应用程序,它应该不断地侦听来自 Kafka 的消息,然后通过 WebSocket 将它们发送到客户端。问题是如何设置常量监听器。为了未来的可扩展性,我们决定在项目中引入 Celery 来管理这些扩展问题。
我的任务实际上是这样的:
class ConsumerTask(Task):
name = 'consume_messages'
def run(self, *args, **kwargs):
consumer = get_kafka_consumer(settings.KAFKA_URL,
settings.FAULT_MESSAGES_KAFKA_TOPIC,
'consumer_messages_group')
logger.info("Kafka's consumer has been started")
while True:
messages = consumer.poll()
for _, messages in messages.items():
messages, messages_count = self.get_message(messages)
if messages_count > 0:
messages = save_to_db()
send_via_websocket_messages(messages)
它通过WS正确保存和发送消息,但问题来自任务中的无限循环。
由于某种原因(可能是任务超时限制)任务在队列中弹出并且再也不会运行。
我不确定守护芹菜工人会解决这个问题。
您能否提供一些策略来组织 "Constantly running part" 这个过程?
您的用例不适合 celery 任务。 Celery 任务不应该是长 运行ning 进程。您需要将任务放入代理队列中,这在您的设置中也没有任何意义。
把你的 while-True 循环想象成一个 celery worker。工作人员应该不断 运行,这也是您在处理更多任务时需要扩展的过程。
写一个 Django management command 使用你的 while-True 循环并使用你将用于将 celery worker 扩展到 运行 个该管理命令的多个实例的缩放。
使用进程管理工具扩展进程,例如 honcho or supervisord。
我有一个 Django 应用程序,它应该不断地侦听来自 Kafka 的消息,然后通过 WebSocket 将它们发送到客户端。问题是如何设置常量监听器。为了未来的可扩展性,我们决定在项目中引入 Celery 来管理这些扩展问题。
我的任务实际上是这样的:
class ConsumerTask(Task):
name = 'consume_messages'
def run(self, *args, **kwargs):
consumer = get_kafka_consumer(settings.KAFKA_URL,
settings.FAULT_MESSAGES_KAFKA_TOPIC,
'consumer_messages_group')
logger.info("Kafka's consumer has been started")
while True:
messages = consumer.poll()
for _, messages in messages.items():
messages, messages_count = self.get_message(messages)
if messages_count > 0:
messages = save_to_db()
send_via_websocket_messages(messages)
它通过WS正确保存和发送消息,但问题来自任务中的无限循环。 由于某种原因(可能是任务超时限制)任务在队列中弹出并且再也不会运行。 我不确定守护芹菜工人会解决这个问题。 您能否提供一些策略来组织 "Constantly running part" 这个过程?
您的用例不适合 celery 任务。 Celery 任务不应该是长 运行ning 进程。您需要将任务放入代理队列中,这在您的设置中也没有任何意义。
把你的 while-True 循环想象成一个 celery worker。工作人员应该不断 运行,这也是您在处理更多任务时需要扩展的过程。
写一个 Django management command 使用你的 while-True 循环并使用你将用于将 celery worker 扩展到 运行 个该管理命令的多个实例的缩放。
使用进程管理工具扩展进程,例如 honcho or supervisord。