如何从 C# 将任务排队到 Celery?

How can I queue a task to Celery from C#?

据我了解,像 RabbitMQ 这样的消息代理可以促进用不同 language/platform 编写的不同应用程序相互通信。因此,由于 celery 可以使用 RabbitMQ 作为消息代理,我相信我们可以将任务从任何应用程序排队到 Celery,即使生产者不是用 Python.

编写的

现在我想弄清楚如何通过 RabbitMQ 从用 C# 编写的应用程序中将任务排队到 Celery。但是我还没有找到这样的例子。

我找到的唯一接近这个的信息是this SO question

接受的答案建议使用 Celery 消息格式协议将消息从 Java 排队到 RabbitMQ。但是,答案中给出的link没有任何示例,只有消息格式。

此外,消息格式表明需要任务 ID (UUID) 才能在此协议中进行通信。我的 C# 应用程序应该如何知道芹菜任务的任务 ID?据我了解,它只能知道任务名称,而不能知道任务 ID。

据此article, celery .Net client uses default TaskScheduler that comes with .Net Framework. This knows how to generate ID for your task. This article also points to some example here

我不知道这个问题是否仍然相关,但希望这个答案对其他人有所帮助。

以下是我如何成功将任务排到 Celery example worker

  1. 您需要按照 here.

    所述在生产者(客户端)与 RabbitMQ 之间建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = username;
        factory.Password = password;
        factory.VirtualHost = virtualhost;
        factory.HostName = hostname;
        factory.Port = port;
    
        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();
    

    在默认的 RabbitMQ 配置中,只有 Guest 用户只能用于本地连接(来自 127.0.0.1)。 this 问题的答案解释了如何在 RabbitMQ 中定义用户。

  2. 下一步 - 创建回调以获取结果。此示例使用 Direct reply-to,因此应答侦听器将如下所示:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var ansBody = ea.Body;
            var ansMessage = Encoding.UTF8.GetString(ansBody);
            Console.WriteLine(" [x] Received {0}", ansMessage);
            Console.WriteLine(" [x] Done");
        };
        channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
    
  3. 正在创建 Celery 将使用的任务消息:

        IDictionary<string, object> headers = new Dictionary<string, object>();
        headers.Add("task", "tasks.add");
        Guid id = Guid.NewGuid();
        headers.Add("id", id.ToString());
    
        IBasicProperties props = channel.CreateBasicProperties();
        props.Headers = headers;
        props.CorrelationId = (string)headers["id"];
        props.ContentEncoding = "utf-8";
        props.ContentType = "application/json";
        props.ReplyTo = "amq.rabbitmq.reply-to";
    
        object[] taskArgs = new object[] { 1, 200 };
    
        object[] arguments = new object[] { taskArgs, new object(), new object()};
    
        MemoryStream stream = new MemoryStream();
        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[]));
        ser.WriteObject(stream, arguments);
        stream.Position = 0;
        StreamReader sr = new StreamReader(stream);
        string message = sr.ReadToEnd();
    
        var body = Encoding.UTF8.GetBytes(message);
    
  4. 最后,将消息发布到 RabbitMQ:

            channel.BasicPublish(exchange: "",
                             routingKey: "celery",
                             basicProperties: props,
                             body: body);
    

芹菜自带花。 Flower 提供了一个 REST API 来管理任务。 https://flower.readthedocs.io/en/latest/api.html#post--api-task-async-apply-(.+) 在大多数情况下,这比手动创建任务并将它们插入 MQ 更简单、更可靠。