如何使用 Ruby 推送 Celery 任务

How to push Celery tasks with Ruby

在我们的应用程序中,我们有一个 Rails 应用程序可以对 Flask API 进行 API 调用。 API然后调度任务,我们使用Redis作为队列。

@app.route("/api/article", methods=["POST"])
def api_article():
    app.logger.info("[route] /api/article")
     .
     .
    res_id = celery.send_task("tasks.text_stats", args=[article], kwargs={}).id
@celery.task(name="tasks.text_stats")
def text_stats(article):
    logger.info("text_stats")

我的想法是,如果 Rails 和 Flask 都可以访问 Redis 数据库,我们可以直接在 Redis 中创建任务,然后由 Celery worker 获取。

问题:

  1. 如何使用 Ruby 在 Redis 中创建 Celery 作业?
  2. 是否有 Ruby 支持此功能的库?

为了解决这个问题,我们可以简单地在Redis中检查Celery如何将任务推送到LIST

这是一个由 Celery 推送到 Redis 的 Celery 任务示例:

{'body': 'W1siYjVmNDhmNDQtOWZiZS00MjdiLWE3NDMtZjc3MDgwMDQzN2ZiIl0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'my_python_function', 'id': '64148470-0dde-4e31-ab63-09c98955d24f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '64148470-0dde-4e31-ab63-09c98955d24f', 'parent_id': None, 'argsrepr': "('my_string_argument_one', 'my_string_argument_2',)", 'kwargsrepr': '{}', 'origin': 'gen73965@mac.local'}, 'properties': {'correlation_id': '64148470-0dde-4e31-ab63-09c98955d24f', 'reply_to': '1a223e51-77c3-37d3-a870-baae15b3b741', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'f840868a-18c9-4b1f-bc95-5ff95666ca21'}}

上面比较重要的键是headers.task,就是你需要调用的python函数名,还有headers.argsreprheaders.kwargsrepr,是一个字符串化的 tuple/dict,表示您需要传递给函数的 args/kwargs。

上面 dict 中还有其他键,例如 routing_key(芹菜队列)、几个 id(您可以自动生成这些)、eta,您可以使用它们自定义行为。

一旦我们了解了 celery 如何与 Redis 交互,类似于 this example,您可以简单地 运行 LPUSH 与 Ruby 的 Redis 客户端与字符串化 python dict.

实际上,更麻烦的部分是 celery 期望字符串化的 LIST 项采用 dict 格式,这与 Ruby 的 to_json 有一些差异。您需要手动执行该部分(例如将 booleans 转换为 python bool