通过基础发布时未执行消费者代码 class
No Consumer code executed when publishing via a base class
我有一个基础 class IntegrationEvent
以及从中继承的所有其他业务事件。
public abstract class IntegrationEvent
{
public Guid Id { get; private set; }
public DateTimeOffset OccuredOn { get; private set; }
protected IntegrationEvent()
{
this.Id = Guid.NewGuid();
this.OccuredOn = DateTimeOffset.Now;
}
}
public sealed class StudentRegisteredIntegrationEvent : IntegrationEvent
{
public Guid StudentId { get; set; }
public string FullName { get; set; }
public StudentRegisteredIntegrationEvent(Guid studentId, string fullName)
{
StudentId = studentId;
FullName = fullName;
}
}
然后我创建了一个消费者:
public sealed class StudentRegisteredConsumer: IConsumer<StudentRegisteredIntegrationEvent>
{
private readonly ILogger<StudentRegisteredConsumer> _logger;
public StudentRegisteredConsumer(ILogger<StudentRegisteredConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<StudentRegisteredIntegrationEvent> context)
{
_logger.LogWarning("========== Message Received +==========================");
_logger.LogInformation($"Sending notification to {context.Message.FullName}");
_logger.LogWarning("========== Message Received +==========================");
return Task.CompletedTask;
}
}
在生产者方面,我有一个 List<IntegrationEvent>
列表,我通过 IPublishEndpoint
publish
它们,但它没有路由到正确的队列,而是创建了另一个交换,Sample.Abstraction.Domain:IntegrationEvent
。我怎样才能告诉 MassTransit 不要使用基础 class 而是使用真实类型 class?我也试过 ISendEndpointProvider
但它们又被路由到另一个队列,student-registered-integration-event_skipped
队列,因为 base
class.
没有消费者可用
这里是消费者端的日志:
[00:33:21 DBG] Configuring endpoint student-registered-integration-event, Consumer: Sample.University.Notification.Consumers.StudentRegisteredConsumer
[00:33:21 DBG] Declare exchange: name: student-registered-integration-event, type: fanout, durable
[00:33:21 DBG] Declare exchange: name: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, type: fanout, durable
[00:33:21 DBG] Bind exchange: source: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, destination: student-registered-integration-event
[00:33:21 DBG] Declare queue: name: student-registered-integration-event, durable
[00:33:21 DBG] Bind queue: source: student-registered-integration-event, destination: student-registered-integration-event
[00:33:21 DBG] Prefetch Count: 16
[00:33:21 DBG] Consumer Ok: rabbitmq://localhost/wrapperizer/student-registered-integration-event - amq.ctag-nqSrJ0A5UQZCXg3tIr9Hfg
我不知道怎么配置,我也用了ConsumerDefinition<StudentRegisteredConsumer>
但是没用,这里是代码:
public sealed class StudentRegisteredConsumerDefinition : ConsumerDefinition<StudentRegisteredConsumer>
{
public StudentRegisteredConsumerDefinition()
{
const string eventName = nameof(StudentRegisteredIntegrationEvent);
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
this.EndpointName = sanitized;
}
}
在生产者端获取发送端点的 uri:
var eventName = logEvt.IntegrationEvent.GetType().Name;
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
var uri = new Uri($"exchange:{sanitized}");
var sender = await _sendEndpointProvider.GetSendEndpoint(uri);
await sender.Send(logEvt.IntegrationEvent);
我知道上面的代码有点像 MT 的默认行为,但如果没有它,我就没有正确的队列和交换。任何解决方案将不胜感激。
首先,您完全可以使用 Publish
来完成此操作,无需配置您已完成的所有操作来尝试解决该问题。您可以按照约定配置您的消费者,并让他们在自己的端点上进行配置。您错过的一部分是消息发布的方式。
根据您的 List<IntegrationEvent>
,我怀疑您调用 Publish<T>(T message)
,其中 T
被推断为 IntegrationEvent
。这就是为什么您只在该交易所收到消息的原因。您需要使用 Publish(object message)
重载,以便确定适当的类型。
您可以简单地将列表中的每条消息分配给对象,然后用该对象调用 Publish
。或者,您可以强制使用重载:
await Task.WhenAll(events.Select(x => bus.Publish((object)x, x.GetType()));
这样,MassTransit 将使用对象类型来调用适当的泛型重载。
我有一个基础 class IntegrationEvent
以及从中继承的所有其他业务事件。
public abstract class IntegrationEvent
{
public Guid Id { get; private set; }
public DateTimeOffset OccuredOn { get; private set; }
protected IntegrationEvent()
{
this.Id = Guid.NewGuid();
this.OccuredOn = DateTimeOffset.Now;
}
}
public sealed class StudentRegisteredIntegrationEvent : IntegrationEvent
{
public Guid StudentId { get; set; }
public string FullName { get; set; }
public StudentRegisteredIntegrationEvent(Guid studentId, string fullName)
{
StudentId = studentId;
FullName = fullName;
}
}
然后我创建了一个消费者:
public sealed class StudentRegisteredConsumer: IConsumer<StudentRegisteredIntegrationEvent>
{
private readonly ILogger<StudentRegisteredConsumer> _logger;
public StudentRegisteredConsumer(ILogger<StudentRegisteredConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<StudentRegisteredIntegrationEvent> context)
{
_logger.LogWarning("========== Message Received +==========================");
_logger.LogInformation($"Sending notification to {context.Message.FullName}");
_logger.LogWarning("========== Message Received +==========================");
return Task.CompletedTask;
}
}
在生产者方面,我有一个 List<IntegrationEvent>
列表,我通过 IPublishEndpoint
publish
它们,但它没有路由到正确的队列,而是创建了另一个交换,Sample.Abstraction.Domain:IntegrationEvent
。我怎样才能告诉 MassTransit 不要使用基础 class 而是使用真实类型 class?我也试过 ISendEndpointProvider
但它们又被路由到另一个队列,student-registered-integration-event_skipped
队列,因为 base
class.
这里是消费者端的日志:
[00:33:21 DBG] Configuring endpoint student-registered-integration-event, Consumer: Sample.University.Notification.Consumers.StudentRegisteredConsumer
[00:33:21 DBG] Declare exchange: name: student-registered-integration-event, type: fanout, durable
[00:33:21 DBG] Declare exchange: name: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, type: fanout, durable
[00:33:21 DBG] Bind exchange: source: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, destination: student-registered-integration-event
[00:33:21 DBG] Declare queue: name: student-registered-integration-event, durable
[00:33:21 DBG] Bind queue: source: student-registered-integration-event, destination: student-registered-integration-event
[00:33:21 DBG] Prefetch Count: 16
[00:33:21 DBG] Consumer Ok: rabbitmq://localhost/wrapperizer/student-registered-integration-event - amq.ctag-nqSrJ0A5UQZCXg3tIr9Hfg
我不知道怎么配置,我也用了ConsumerDefinition<StudentRegisteredConsumer>
但是没用,这里是代码:
public sealed class StudentRegisteredConsumerDefinition : ConsumerDefinition<StudentRegisteredConsumer>
{
public StudentRegisteredConsumerDefinition()
{
const string eventName = nameof(StudentRegisteredIntegrationEvent);
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
this.EndpointName = sanitized;
}
}
在生产者端获取发送端点的 uri:
var eventName = logEvt.IntegrationEvent.GetType().Name;
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
var uri = new Uri($"exchange:{sanitized}");
var sender = await _sendEndpointProvider.GetSendEndpoint(uri);
await sender.Send(logEvt.IntegrationEvent);
我知道上面的代码有点像 MT 的默认行为,但如果没有它,我就没有正确的队列和交换。任何解决方案将不胜感激。
首先,您完全可以使用 Publish
来完成此操作,无需配置您已完成的所有操作来尝试解决该问题。您可以按照约定配置您的消费者,并让他们在自己的端点上进行配置。您错过的一部分是消息发布的方式。
根据您的 List<IntegrationEvent>
,我怀疑您调用 Publish<T>(T message)
,其中 T
被推断为 IntegrationEvent
。这就是为什么您只在该交易所收到消息的原因。您需要使用 Publish(object message)
重载,以便确定适当的类型。
您可以简单地将列表中的每条消息分配给对象,然后用该对象调用 Publish
。或者,您可以强制使用重载:
await Task.WhenAll(events.Select(x => bus.Publish((object)x, x.GetType()));
这样,MassTransit 将使用对象类型来调用适当的泛型重载。