将消息从 Python (Pika) 发布到 RabbitMQ 的正确队列,以便在 .NET (MassTransit) 中使用
Publish a message to the right queue of RabbitMQ from Python (Pika) for consumption in .NET (MassTransit)
我想将消息从基于 Python 的微服务(使用 Pika)发布到由基于 .NET 的微服务(使用 MassTransit 作为事件总线)使用的 RabbitMQ 队列。
发布者(Python类)示例代码如下:
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'))
channel = connection.channel()
channel.queue_declare(queue='queue_net', durable=True)
channel.basic_publish(exchange='', routing_key='queue_net', body='message',
properties=pika.BasicProperties(
content_type = 'text/plain',
delivery_mode = 2 # make messages persistent
))
消息没有被发送到 queue_net
队列,而是被发送到 queue_net_error
队列(从 RabbitMQ 管理屏幕)。
为了完整起见,(基于.NET的)消费者的示例代码如下:
在Startup
, ConfigureServices
:
services.AddMassTransit(config =>
{
config.AddConsumer<SampleConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("amqp://rabbitmq:5672", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("queue_net", ep =>
{
ep.ConfigureConsumer<SampleConsumer>(ctx);
});
});
});
其中 SampleConsumer
定义为:
public class SampleConsumer : IConsumer<string>
{
public Task Consume(ConsumeContext<string> context)
{
Console.WriteLine(context);
return Task.CompletedTask;
}
}
我做错了什么?我可以做相反的事情,即从 .NET 发布并在 Python.
中消费
非常感谢。
您可以使用 supported message format 和内容类型,以便 MassTransit 可以使用该消息,或者您可以使用 MassTransit 中的原始 JSON 消息反序列化器来使用来自其他语言的消息发送原始 JSON.
当无法处理消息时,消息最终进入 _error 队列,这可能是序列化错误或消费者抛出的异常。当消息能够被反序列化但没有消费者实际使用消息(由于消息类型不匹配等)时,消息最终进入 _skipped 队列。
我想将消息从基于 Python 的微服务(使用 Pika)发布到由基于 .NET 的微服务(使用 MassTransit 作为事件总线)使用的 RabbitMQ 队列。
发布者(Python类)示例代码如下:
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'))
channel = connection.channel()
channel.queue_declare(queue='queue_net', durable=True)
channel.basic_publish(exchange='', routing_key='queue_net', body='message',
properties=pika.BasicProperties(
content_type = 'text/plain',
delivery_mode = 2 # make messages persistent
))
消息没有被发送到 queue_net
队列,而是被发送到 queue_net_error
队列(从 RabbitMQ 管理屏幕)。
为了完整起见,(基于.NET的)消费者的示例代码如下:
在Startup
, ConfigureServices
:
services.AddMassTransit(config =>
{
config.AddConsumer<SampleConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("amqp://rabbitmq:5672", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("queue_net", ep =>
{
ep.ConfigureConsumer<SampleConsumer>(ctx);
});
});
});
其中 SampleConsumer
定义为:
public class SampleConsumer : IConsumer<string>
{
public Task Consume(ConsumeContext<string> context)
{
Console.WriteLine(context);
return Task.CompletedTask;
}
}
我做错了什么?我可以做相反的事情,即从 .NET 发布并在 Python.
中消费非常感谢。
您可以使用 supported message format 和内容类型,以便 MassTransit 可以使用该消息,或者您可以使用 MassTransit 中的原始 JSON 消息反序列化器来使用来自其他语言的消息发送原始 JSON.
当无法处理消息时,消息最终进入 _error 队列,这可能是序列化错误或消费者抛出的异常。当消息能够被反序列化但没有消费者实际使用消息(由于消息类型不匹配等)时,消息最终进入 _skipped 队列。