使用 C# 在 Artemis 上发送请求响应消息
Sending request reponse message on Artemis using C#
我正在尝试使用 ArtemisNetClient 在 C# 中实现请求响应模式,但在寻找如何在实际解决方案中以更通用的方式执行此操作时遇到了一些麻烦。
基于一些 Java 示例,我能够在两个控制台应用程序中执行类似的操作:
发件人
static async System.Threading.Tasks.Task Main(string[] args)
{
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
string guid = new Guid().ToString();
var requestAddress = "TRADE REQ1";
var responseAddress = "TRADE RESP";
Message message = new Message("BUY AMD 1000 SHARES");
message.SetCorrelationId(guid);
message.ReplyTo = responseAddress;
var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
await producer.SendAsync(message);
var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
var responseMessage = await consumer.ReceiveAsync();
Console.WriteLine(responseMessage.GetBody<string>());
}
接收器
static async System.Threading.Tasks.Task Main(string[] args)
{
// Create connection
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
var requestAddress = "TRADE REQ1";
// Create consumer to receive trade request messages
var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
var message = await consumer.ReceiveAsync();
Console.WriteLine($"Received message: {message.GetBody<string>()}");
// Confirm trade request and ssend response message
if (!string.IsNullOrEmpty(message.ReplyTo))
{
Message responseMessage = new Message("Confirmed trade request");
responseMessage.SetCorrelationId(message.CorrelationId);
var producer = await connection.CreateProducerAsync(message.ReplyTo);
await producer.SendAsync(responseMessage);
}
}
这按预期工作,但我想了解更多 this article 中描述的内容,只是它没有任何请求响应模式的示例。
详细说明一下,我目前有两个服务要进行通信。
在服务1中,我想创建并发布一条消息,然后等待响应以丰富实例对象并将其保存到数据库中。我目前有这个,但它缺少等待响应消息。
public async Task<Instance> CreateInstance(Instance instance)
{
await _instanceCollection.InsertOneAsync(instance);
var @event = new InstanceCreated
{
Id = instance.Id,
SiteUrl = instance.SiteUrl
};
await _messageProducer.PublishAsync(@event);
return instance;
}
我想我可能需要在 PublishAsync()
中设置一个临时的 queue/connection 或其他内容,并将其更改为例如Task<Message>
支持 return 发送响应消息。但是我该怎么做呢?我是否必须像在控制台应用程序示例中那样做一个新的 connectionfactory + CreateConsumerAsync
等?
public class MessageProducer
{
private readonly IAnonymousProducer _producer;
public MessageProducer(IAnonymousProducer producer)
{
_producer = producer;
}
public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = typeof(T).Name;
var msg = new Message(serialized);
if (replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = routeName;
var msg = new Message(serialized);
if(replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
}
在 Service 2 中,我有一个 InstanceCreatedConsumer
可以接收消息,但它同样无法 return 响应消息。
public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
private readonly MessageProducer _messageProducer;
public InstanceCreatedConsumer(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
{
// consume message and return response
}
}
我认为我可以使用 ConsumeAsync
和 HandleMessage
来扩展 ActiveMqExtensions
class 来处理具有 return 值的响应消息, 但我还没有走到这一步。
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
RoutingType routingType)
where TConsumer : class, ITypedConsumer<TMessage>
{
builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
return builder;
}
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
where TConsumer : class, ITypedConsumer<TMessage>
{
try
{
var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
catch(Exception ex)
{
// todo
}
}
我在这里尝试实现的目标是完全错误的,还是 ArtemisNetClient 无法实现?
也许有人有示例或者可以确认我是否走在正确的道路上,或者我应该使用不同的框架。
我对通过 ActiveMQ Artemis 等消息进行的这种通信不熟悉,因此非常感谢任何指导。
我在 ArtemisNetClient 中没有看到任何可以从应用程序的角度简化 request/response 模式的内容。人们可能期待类似于 JMS 的东西 QueueRequestor
,但我在代码中没有看到类似的东西,也没有在文档中看到类似的东西。
我建议您只需在您的应用程序中执行您在示例中所做的操作(即手动创建消费者和生产者以分别处理每一端的响应)。我建议的唯一更改是重新使用连接,这样您就可以创建尽可能少的连接。连接池在这里是理想的。
就其价值而言,在我看来,第一个 release of ArtemisNetClient was just 3 months ago and according to GitHub 除了 2 个代码库提交之外,其他所有提交都来自一个开发人员。 ArtemisNetClient 可能会成长为一个非常成功的 C# 客户端实现,但在这一点上它似乎相对不成熟。即使 if 现有代码质量很高,如果客户端周围没有坚实的社区,那么它很可能无法获得及时错误修复、新功能等所需的支持.只有时间会证明一切。
在 2.7.0 版中,ArtemisNetClient 引入了 IRequestReplyClient
接口,可用于实现 request-response 消息传递模式。对于 ArtemisNetClient.Extensions.DependencyInjection
,这可能如下所示:
客户端:
首先您需要在 DI 中注册您键入的 request-reply 客户端:
public void ConfigureServices(IServiceCollection services)
{
/*...*/
var endpoints = new[] { Endpoint.Create(host: "localhost", port: 5672, "guest", "guest") };
services.AddActiveMq("bookstore-cluster", endpoints)
.AddRequestReplyClient<MyRequestReplyClient>();
/*...*/
}
MyRequestReplyClient
是您的自定义 class,它希望通过构造函数注入 IRequestReplyClient
。一旦你有了自定义 class,你可以直接公开 IRequestReplyClient
或在其中封装发送逻辑:
public class MyRequestReplyClient
{
private readonly IRequestReplyClient _requestReplyClient;
public MyRequestReplyClient(IRequestReplyClient requestReplyClient)
{
_requestReplyClient = requestReplyClient;
}
public async Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken)
{
var serialized = JsonSerializer.Serialize(request);
var address = typeof(TRequest).Name;
var msg = new Message(serialized);
var response = await _requestReplyClient.SendAsync(address, msg, cancellationToken);
return JsonSerializer.Deserialize<TResponse>(response.GetBody<string>());
}
}
关于 client-side。
工人端
要实现工作端,您可以(按照您的建议)将 ITypedConsumer
接口更改为 return 将发回的消息,或者您可以提供附加数据(ReplyTo
和 CorrelationId
headers) 因此您可以将响应作为消费者逻辑的一部分发回。我更喜欢后者,因为在我看来它是一个更灵活的选择。
修改后的 ITypedConsumer
可能看起来像这样:
public interface ITypedConsumer<in T>
{
public Task ConsumeAsync(T message, MessageContext context, CancellationToken cancellationToken);
}
其中 MessageContext
只是一个简单的 dto:
public class MessageContext
{
public string ReplyTo { get; init; }
public string CorrelationId { get; init; }
}
HandleMessage
扩展方法:
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
where TConsumer : class, ITypedConsumer<TMessage>
{
var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
var messageContext = new MessageContext
{
ReplyTo = message.ReplyTo,
CorrelationId = message.CorrelationId
};
await typedConsumer.ConsumeAsync(msg, messageContext, token);
await consumer.AcceptAsync(message);
}
MessageProducer
也必须稍作更改,因此您可以显式传递地址和 CorrelationId
:
public class MessageProducer
{
private readonly IAnonymousProducer _producer;
public MessageProducer(IAnonymousProducer producer)
{
_producer = producer;
}
public async Task PublishAsync<T>(string address, T message, MessageContext context, CancellationToken cancellationToken)
{
var serialized = JsonSerializer.Serialize(message);
var msg = new Message(serialized);
if (!string.IsNullOrEmpty(context.CorrelationId))
{
msg.CorrelationId = context.CorrelationId;
}
await _producer.SendAsync(address, msg, cancellationToken);
}
}
最后,模范消费者可以这样工作:
public class CreateBookConsumer : ITypedConsumer<CreateBook>
{
private readonly MessageProducer _messageProducer;
public CreateBookConsumer(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
public async Task ConsumeAsync(CreateBook message, MessageContext context, CancellationToken cancellationToken)
{
var @event = new BookCreated
{
Id = Guid.NewGuid(),
Title = message.Title,
Author = message.Author,
Cost = message.Cost,
InventoryAmount = message.InventoryAmount,
UserId = message.UserId,
Timestamp = DateTime.UtcNow
};
await _messageProducer.PublishAsync(context.ReplyTo, @event, new MessageContext
{
CorrelationId = context.CorrelationId
}, cancellationToken);
}
}
我正在尝试使用 ArtemisNetClient 在 C# 中实现请求响应模式,但在寻找如何在实际解决方案中以更通用的方式执行此操作时遇到了一些麻烦。
基于一些 Java 示例,我能够在两个控制台应用程序中执行类似的操作:
发件人
static async System.Threading.Tasks.Task Main(string[] args)
{
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
string guid = new Guid().ToString();
var requestAddress = "TRADE REQ1";
var responseAddress = "TRADE RESP";
Message message = new Message("BUY AMD 1000 SHARES");
message.SetCorrelationId(guid);
message.ReplyTo = responseAddress;
var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
await producer.SendAsync(message);
var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
var responseMessage = await consumer.ReceiveAsync();
Console.WriteLine(responseMessage.GetBody<string>());
}
接收器
static async System.Threading.Tasks.Task Main(string[] args)
{
// Create connection
var connectionFactory = new ConnectionFactory();
var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
var connection = await connectionFactory.CreateAsync(endpoint);
var requestAddress = "TRADE REQ1";
// Create consumer to receive trade request messages
var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
var message = await consumer.ReceiveAsync();
Console.WriteLine($"Received message: {message.GetBody<string>()}");
// Confirm trade request and ssend response message
if (!string.IsNullOrEmpty(message.ReplyTo))
{
Message responseMessage = new Message("Confirmed trade request");
responseMessage.SetCorrelationId(message.CorrelationId);
var producer = await connection.CreateProducerAsync(message.ReplyTo);
await producer.SendAsync(responseMessage);
}
}
这按预期工作,但我想了解更多 this article 中描述的内容,只是它没有任何请求响应模式的示例。
详细说明一下,我目前有两个服务要进行通信。
在服务1中,我想创建并发布一条消息,然后等待响应以丰富实例对象并将其保存到数据库中。我目前有这个,但它缺少等待响应消息。
public async Task<Instance> CreateInstance(Instance instance)
{
await _instanceCollection.InsertOneAsync(instance);
var @event = new InstanceCreated
{
Id = instance.Id,
SiteUrl = instance.SiteUrl
};
await _messageProducer.PublishAsync(@event);
return instance;
}
我想我可能需要在 PublishAsync()
中设置一个临时的 queue/connection 或其他内容,并将其更改为例如Task<Message>
支持 return 发送响应消息。但是我该怎么做呢?我是否必须像在控制台应用程序示例中那样做一个新的 connectionfactory + CreateConsumerAsync
等?
public class MessageProducer
{
private readonly IAnonymousProducer _producer;
public MessageProducer(IAnonymousProducer producer)
{
_producer = producer;
}
public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = typeof(T).Name;
var msg = new Message(serialized);
if (replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
{
var serialized = JsonSerializer.Serialize(message);
var address = routeName;
var msg = new Message(serialized);
if(replyTo != null && correlationId != null)
{
msg.CorrelationId = correlationId;
msg.ReplyTo = replyTo;
}
await _producer.SendAsync(address, msg);
}
}
在 Service 2 中,我有一个 InstanceCreatedConsumer
可以接收消息,但它同样无法 return 响应消息。
public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
private readonly MessageProducer _messageProducer;
public InstanceCreatedConsumer(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
{
// consume message and return response
}
}
我认为我可以使用 ConsumeAsync
和 HandleMessage
来扩展 ActiveMqExtensions
class 来处理具有 return 值的响应消息, 但我还没有走到这一步。
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
RoutingType routingType)
where TConsumer : class, ITypedConsumer<TMessage>
{
builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
return builder;
}
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
where TConsumer : class, ITypedConsumer<TMessage>
{
try
{
var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
catch(Exception ex)
{
// todo
}
}
我在这里尝试实现的目标是完全错误的,还是 ArtemisNetClient 无法实现?
也许有人有示例或者可以确认我是否走在正确的道路上,或者我应该使用不同的框架。
我对通过 ActiveMQ Artemis 等消息进行的这种通信不熟悉,因此非常感谢任何指导。
我在 ArtemisNetClient 中没有看到任何可以从应用程序的角度简化 request/response 模式的内容。人们可能期待类似于 JMS 的东西 QueueRequestor
,但我在代码中没有看到类似的东西,也没有在文档中看到类似的东西。
我建议您只需在您的应用程序中执行您在示例中所做的操作(即手动创建消费者和生产者以分别处理每一端的响应)。我建议的唯一更改是重新使用连接,这样您就可以创建尽可能少的连接。连接池在这里是理想的。
就其价值而言,在我看来,第一个 release of ArtemisNetClient was just 3 months ago and according to GitHub 除了 2 个代码库提交之外,其他所有提交都来自一个开发人员。 ArtemisNetClient 可能会成长为一个非常成功的 C# 客户端实现,但在这一点上它似乎相对不成熟。即使 if 现有代码质量很高,如果客户端周围没有坚实的社区,那么它很可能无法获得及时错误修复、新功能等所需的支持.只有时间会证明一切。
在 2.7.0 版中,ArtemisNetClient 引入了 IRequestReplyClient
接口,可用于实现 request-response 消息传递模式。对于 ArtemisNetClient.Extensions.DependencyInjection
,这可能如下所示:
客户端:
首先您需要在 DI 中注册您键入的 request-reply 客户端:
public void ConfigureServices(IServiceCollection services)
{
/*...*/
var endpoints = new[] { Endpoint.Create(host: "localhost", port: 5672, "guest", "guest") };
services.AddActiveMq("bookstore-cluster", endpoints)
.AddRequestReplyClient<MyRequestReplyClient>();
/*...*/
}
MyRequestReplyClient
是您的自定义 class,它希望通过构造函数注入 IRequestReplyClient
。一旦你有了自定义 class,你可以直接公开 IRequestReplyClient
或在其中封装发送逻辑:
public class MyRequestReplyClient
{
private readonly IRequestReplyClient _requestReplyClient;
public MyRequestReplyClient(IRequestReplyClient requestReplyClient)
{
_requestReplyClient = requestReplyClient;
}
public async Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken)
{
var serialized = JsonSerializer.Serialize(request);
var address = typeof(TRequest).Name;
var msg = new Message(serialized);
var response = await _requestReplyClient.SendAsync(address, msg, cancellationToken);
return JsonSerializer.Deserialize<TResponse>(response.GetBody<string>());
}
}
关于 client-side。
工人端
要实现工作端,您可以(按照您的建议)将 ITypedConsumer
接口更改为 return 将发回的消息,或者您可以提供附加数据(ReplyTo
和 CorrelationId
headers) 因此您可以将响应作为消费者逻辑的一部分发回。我更喜欢后者,因为在我看来它是一个更灵活的选择。
修改后的 ITypedConsumer
可能看起来像这样:
public interface ITypedConsumer<in T>
{
public Task ConsumeAsync(T message, MessageContext context, CancellationToken cancellationToken);
}
其中 MessageContext
只是一个简单的 dto:
public class MessageContext
{
public string ReplyTo { get; init; }
public string CorrelationId { get; init; }
}
HandleMessage
扩展方法:
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
where TConsumer : class, ITypedConsumer<TMessage>
{
var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
var messageContext = new MessageContext
{
ReplyTo = message.ReplyTo,
CorrelationId = message.CorrelationId
};
await typedConsumer.ConsumeAsync(msg, messageContext, token);
await consumer.AcceptAsync(message);
}
MessageProducer
也必须稍作更改,因此您可以显式传递地址和 CorrelationId
:
public class MessageProducer
{
private readonly IAnonymousProducer _producer;
public MessageProducer(IAnonymousProducer producer)
{
_producer = producer;
}
public async Task PublishAsync<T>(string address, T message, MessageContext context, CancellationToken cancellationToken)
{
var serialized = JsonSerializer.Serialize(message);
var msg = new Message(serialized);
if (!string.IsNullOrEmpty(context.CorrelationId))
{
msg.CorrelationId = context.CorrelationId;
}
await _producer.SendAsync(address, msg, cancellationToken);
}
}
最后,模范消费者可以这样工作:
public class CreateBookConsumer : ITypedConsumer<CreateBook>
{
private readonly MessageProducer _messageProducer;
public CreateBookConsumer(MessageProducer messageProducer)
{
_messageProducer = messageProducer;
}
public async Task ConsumeAsync(CreateBook message, MessageContext context, CancellationToken cancellationToken)
{
var @event = new BookCreated
{
Id = Guid.NewGuid(),
Title = message.Title,
Author = message.Author,
Cost = message.Cost,
InventoryAmount = message.InventoryAmount,
UserId = message.UserId,
Timestamp = DateTime.UtcNow
};
await _messageProducer.PublishAsync(context.ReplyTo, @event, new MessageContext
{
CorrelationId = context.CorrelationId
}, cancellationToken);
}
}