将消息从 Python (Pika) 发布到 RabbitMQ 的正确队列,以便在 .NET (MassTransit) 中使用

Publish a message to the right queue of RabbitMQ from Python (Pika) for consumption in .NET (MassTransit)

我想将消息从基于 Python 的微服务(使用 Pika)发布到由基于 .NET 的微服务(使用 MassTransit 作为事件总线)使用的 RabbitMQ 队列。

发布者(Python类)示例代码如下:

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'))

channel = connection.channel()
channel.queue_declare(queue='queue_net', durable=True)

channel.basic_publish(exchange='', routing_key='queue_net', body='message',
properties=pika.BasicProperties(
    content_type = 'text/plain',
    delivery_mode = 2 # make messages persistent
))

消息没有被发送到 queue_net 队列,而是被发送到 queue_net_error 队列(从 RabbitMQ 管理屏幕)。

为了完整起见,(基于.NET的)消费者的示例代码如下:

Startup, ConfigureServices:

services.AddMassTransit(config =>
{
    config.AddConsumer<SampleConsumer>();
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("amqp://rabbitmq:5672", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        cfg.ReceiveEndpoint("queue_net", ep =>
        {
            ep.ConfigureConsumer<SampleConsumer>(ctx);
        });
    });
});

其中 SampleConsumer 定义为:

public class SampleConsumer : IConsumer<string>
{
    public Task Consume(ConsumeContext<string> context)
    {
        Console.WriteLine(context);
        return Task.CompletedTask;
    }
}

我做错了什么?我可以做相反的事情,即从 .NET 发布并在 Python.

中消费

非常感谢。

您可以使用 supported message format 和内容类型,以便 MassTransit 可以使用该消息,或者您可以使用 MassTransit 中的原始 JSON 消息反序列化器来使用来自其他语言的消息发送原始 JSON.

当无法处理消息时,消息最终进入 _error 队列,这可能是序列化错误或消费者抛出的异常。当消息能够被反序列化但没有消费者实际使用消息(由于消息类型不匹配等)时,消息最终进入 _skipped 队列。