Python 发布到 RabbitMQ exchange/queue 由 ASP.NET 核心服务使用

Python publish to RabbitMQ exchange/queue consumed by ASP.NET Core Service

我是 运行 RabbitMQ,在 Docker 容器(rabbitmq:3-管理映像)中作为 Docker Compose 应用程序的一部分。该应用程序包含一些 ASP.NET 核心 WebApi 微服务,它们通过此代理交换消息。效果很好,到目前为止没有给我带来任何问题。

现在我需要将消息从 Python 应用程序发布到 exchange/queue,它是从 ASP.NET 核心微服务之一创建的。微服务包含此队列的消费者。对于 python 的发布,我使用的是 pika。问题是,我似乎无法获得发布权。每当我执行 Python 脚本时,我都可以在 RabbitMQ 管理 UI 中看到创建了一个新的交换和队列,后缀为“_skipped”。好像我的消息被发送到那里而不是实际的队列。此外,当尝试直接从管理 UI 发布消息时,消息实际上会发送到微服务,但我会遇到一个异常,消息无法反序列化为 MassTransit 信封对象,还有一个带有“_error”后缀的新交换和队列。

我不知道问题出在哪里。我认为 exchange/queue 本身很好,因为其他 queues/consumers/publishers 在这个项目中用于微服务到微服务的通信。那么这可能是我试图从 Python 解决 exchange/queue 的方式,或者是我的邮件正文不正确。

This page gives some info about how messages need to be structured, but not too detailed, and here 我得到了关于如何使用 Python 发布的大部分信息。

下面是微服务中host/queue配置的相关代码,以及Python脚本。任何关于我如何让它工作的 help/tips 都将不胜感激。

ASP.NET核心:

// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
    services.AddMassTransit(x =>
    {
        x.AddConsumer<MappingUpdateConsumer>();
        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config => 
        { 
            config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
            {
                h.Username(RabbitMqConst.RabbitMqUsername);
                h.Password(RabbitMqConst.RabbitMqPassword);
            });         
            config.ReceiveEndpoint("mappingQueue", e =>
            {
                e.ConfigureConsumer<MappingUpdateConsumer>(provider);                        
            });
        }));
    });
    services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>
{
    ...
    public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
    {
        await Task.Run(async () =>
        {
            if (context.Message == null)
            {
                return;
            }
            ...
        });
    }
}
// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage
{
    public string Message { get; set; }
}

Python:

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)

message = {
  "message"    : {
    "message": "Hello World"
  },
  "messageType": [
    "urn:message:MassTransit.Tests:ValueMessage"
  ]
}

channel.basic_publish(exchange='mappingQueue',
                      routing_key='mappingQueue',
                      body=json.dumps(message))

connection.close()
print("sent")

遇到同样问题的朋友,我终于弄明白了:

..
config.ReceiveEndpoint("mappingQueue", e =>
{
    e.ClearMessageDeserializers();
    e.UseRawJsonSerializer();
    e.ConfigureConsumer<MappingUpdateConsumer>(provider);
});
...