如何按名称调用 Celery 任务
How to invoke Celery task by name
我们有一个使用 Celery 的 Python 应用程序,以 RabbitMQ 作为代理。将此应用程序视为管理应用程序,仅将 messages/tasks 放入 Broker 中,不会对其进行操作。
将有另一个应用程序(可能基于也可能不基于 Python)根据消息执行操作。
当代码库中不存在该任务时,管理应用程序是否可以将 message/task 放入队列?如果是这样,我将如何处理?
您可以使用 AMQP 客户端向队列发送新消息,可以在 rabbitMQ docs.
中找到现有消息的列表
具体怎么做取决于您将使用的客户端和语言,但原则上一切都是为了发送符合 celery 使用的协议的消息:
# sample message shown in the celery docs
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
您可以在 celery 文档中找到更详细的消息描述 here。
例如,如果使用 kombu 客户端(python 也被 celery 使用),你将执行如下操作:
with Connection("my_broker_url") as connection:
queue = connection.SimpleQueue(queue_name)
message = {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
queue.put(message,
serializer="json")
这是 kombu docs
中的一个高度精简的示例
再搜索一下后跟进
所以我完全忘了提 flower,这是一个很棒的 celery 监控和管理工具。它作为守护进程运行,并公开 Web 界面和漂亮的 Rest API!
我发现你可以从花API发送任务,就这么简单:
POST /api/task/send-task/tasks.add HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:5555
{
"args": [1, 2]
}
有几个 API 端点可以让您执行此操作,但有一些不同:
但在这种情况下,您可能希望使用 send-task
,因为它明确指出您不需要任务源。
希望对您有所帮助!
还有一种更"Celery-esque"的方式叫做Signatures。设置指向同一代理的 Celery 应用程序并为您的任务创建签名:
from celery import Celery
celeryapp = Celery(...)
my_task = celeryapp.signature('task.add')
result = my_task.delay(2, 2)
print result.get()
- 创建
Celery
的新实例或从模块导入。
调用send_task()
方法。
from twittershell.core import app # Celery Instance from a module
async_result = app.send_task(name='tasks.followers', args=(529675892, ))
send_task
方法的签名是:
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, producer=None, connection=None,
router=None, result_cls=None, expires=None,
publisher=None, link=None, link_error=None,
add_to_parent=True, reply_to=None, **options):
我们有一个使用 Celery 的 Python 应用程序,以 RabbitMQ 作为代理。将此应用程序视为管理应用程序,仅将 messages/tasks 放入 Broker 中,不会对其进行操作。
将有另一个应用程序(可能基于也可能不基于 Python)根据消息执行操作。
当代码库中不存在该任务时,管理应用程序是否可以将 message/task 放入队列?如果是这样,我将如何处理?
您可以使用 AMQP 客户端向队列发送新消息,可以在 rabbitMQ docs.
中找到现有消息的列表具体怎么做取决于您将使用的客户端和语言,但原则上一切都是为了发送符合 celery 使用的协议的消息:
# sample message shown in the celery docs
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
您可以在 celery 文档中找到更详细的消息描述 here。
例如,如果使用 kombu 客户端(python 也被 celery 使用),你将执行如下操作:
with Connection("my_broker_url") as connection:
queue = connection.SimpleQueue(queue_name)
message = {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
queue.put(message,
serializer="json")
这是 kombu docs
中的一个高度精简的示例再搜索一下后跟进
所以我完全忘了提 flower,这是一个很棒的 celery 监控和管理工具。它作为守护进程运行,并公开 Web 界面和漂亮的 Rest API!
我发现你可以从花API发送任务,就这么简单:
POST /api/task/send-task/tasks.add HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:5555
{
"args": [1, 2]
}
有几个 API 端点可以让您执行此操作,但有一些不同:
但在这种情况下,您可能希望使用 send-task
,因为它明确指出您不需要任务源。
希望对您有所帮助!
还有一种更"Celery-esque"的方式叫做Signatures。设置指向同一代理的 Celery 应用程序并为您的任务创建签名:
from celery import Celery
celeryapp = Celery(...)
my_task = celeryapp.signature('task.add')
result = my_task.delay(2, 2)
print result.get()
- 创建
Celery
的新实例或从模块导入。 调用
send_task()
方法。from twittershell.core import app # Celery Instance from a module async_result = app.send_task(name='tasks.followers', args=(529675892, ))
send_task
方法的签名是:
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, producer=None, connection=None,
router=None, result_cls=None, expires=None,
publisher=None, link=None, link_error=None,
add_to_parent=True, reply_to=None, **options):