Python 发布到 RabbitMQ exchange/queue 由 ASP.NET 核心服务使用
Python publish to RabbitMQ exchange/queue consumed by ASP.NET Core Service
我是 运行 RabbitMQ,在 Docker 容器(rabbitmq:3-管理映像)中作为 Docker Compose 应用程序的一部分。该应用程序包含一些 ASP.NET 核心 WebApi 微服务,它们通过此代理交换消息。效果很好,到目前为止没有给我带来任何问题。
现在我需要将消息从 Python 应用程序发布到 exchange/queue,它是从 ASP.NET 核心微服务之一创建的。微服务包含此队列的消费者。对于 python 的发布,我使用的是 pika。问题是,我似乎无法获得发布权。每当我执行 Python 脚本时,我都可以在 RabbitMQ 管理 UI 中看到创建了一个新的交换和队列,后缀为“_skipped”。好像我的消息被发送到那里而不是实际的队列。此外,当尝试直接从管理 UI 发布消息时,消息实际上会发送到微服务,但我会遇到一个异常,消息无法反序列化为 MassTransit 信封对象,还有一个带有“_error”后缀的新交换和队列。
我不知道问题出在哪里。我认为 exchange/queue 本身很好,因为其他 queues/consumers/publishers 在这个项目中用于微服务到微服务的通信。那么这可能是我试图从 Python 解决 exchange/queue 的方式,或者是我的邮件正文不正确。
This page gives some info about how messages need to be structured, but not too detailed, and here 我得到了关于如何使用 Python 发布的大部分信息。
下面是微服务中host/queue配置的相关代码,以及Python脚本。任何关于我如何让它工作的 help/tips 都将不胜感激。
ASP.NET核心:
// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
services.AddMassTransit(x =>
{
x.AddConsumer<MappingUpdateConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
{
h.Username(RabbitMqConst.RabbitMqUsername);
h.Password(RabbitMqConst.RabbitMqPassword);
});
config.ReceiveEndpoint("mappingQueue", e =>
{
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
});
}));
});
services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>
{
...
public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
{
await Task.Run(async () =>
{
if (context.Message == null)
{
return;
}
...
});
}
}
// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage
{
public string Message { get; set; }
}
Python:
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)
message = {
"message" : {
"message": "Hello World"
},
"messageType": [
"urn:message:MassTransit.Tests:ValueMessage"
]
}
channel.basic_publish(exchange='mappingQueue',
routing_key='mappingQueue',
body=json.dumps(message))
connection.close()
print("sent")
遇到同样问题的朋友,我终于弄明白了:
..
config.ReceiveEndpoint("mappingQueue", e =>
{
e.ClearMessageDeserializers();
e.UseRawJsonSerializer();
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
});
...
我是 运行 RabbitMQ,在 Docker 容器(rabbitmq:3-管理映像)中作为 Docker Compose 应用程序的一部分。该应用程序包含一些 ASP.NET 核心 WebApi 微服务,它们通过此代理交换消息。效果很好,到目前为止没有给我带来任何问题。
现在我需要将消息从 Python 应用程序发布到 exchange/queue,它是从 ASP.NET 核心微服务之一创建的。微服务包含此队列的消费者。对于 python 的发布,我使用的是 pika。问题是,我似乎无法获得发布权。每当我执行 Python 脚本时,我都可以在 RabbitMQ 管理 UI 中看到创建了一个新的交换和队列,后缀为“_skipped”。好像我的消息被发送到那里而不是实际的队列。此外,当尝试直接从管理 UI 发布消息时,消息实际上会发送到微服务,但我会遇到一个异常,消息无法反序列化为 MassTransit 信封对象,还有一个带有“_error”后缀的新交换和队列。
我不知道问题出在哪里。我认为 exchange/queue 本身很好,因为其他 queues/consumers/publishers 在这个项目中用于微服务到微服务的通信。那么这可能是我试图从 Python 解决 exchange/queue 的方式,或者是我的邮件正文不正确。
This page gives some info about how messages need to be structured, but not too detailed, and here 我得到了关于如何使用 Python 发布的大部分信息。
下面是微服务中host/queue配置的相关代码,以及Python脚本。任何关于我如何让它工作的 help/tips 都将不胜感激。
ASP.NET核心:
// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
services.AddMassTransit(x =>
{
x.AddConsumer<MappingUpdateConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
{
h.Username(RabbitMqConst.RabbitMqUsername);
h.Password(RabbitMqConst.RabbitMqPassword);
});
config.ReceiveEndpoint("mappingQueue", e =>
{
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
});
}));
});
services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>
{
...
public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
{
await Task.Run(async () =>
{
if (context.Message == null)
{
return;
}
...
});
}
}
// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage
{
public string Message { get; set; }
}
Python:
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)
message = {
"message" : {
"message": "Hello World"
},
"messageType": [
"urn:message:MassTransit.Tests:ValueMessage"
]
}
channel.basic_publish(exchange='mappingQueue',
routing_key='mappingQueue',
body=json.dumps(message))
connection.close()
print("sent")
遇到同样问题的朋友,我终于弄明白了:
..
config.ReceiveEndpoint("mappingQueue", e =>
{
e.ClearMessageDeserializers();
e.UseRawJsonSerializer();
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
});
...