使用 Axon 4 从 AMQP 接收事件
Receive events from AMQP with Axon 4
我正在尝试通过 rabbitmq 将消息发送到基于 axon4 spring 引导的系统。收到消息但未触发任何事件。我很确定我遗漏了一个重要的部分,但直到现在我还没有弄明白。
这里是我的相关部分application.yml
axon:
amqp:
exchange: axon.fanout
transaction-mode: publisher_ack
# adding the following lines changed nothing
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
spring:
rabbitmq:
username: rabbit
password: rabbit
从文档中我发现我应该创建一个 SpringAMQPMessageSource bean:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class AxonConfig {
@Bean
SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@RabbitListener(queues = "in.queue")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
super.onMessage(message, channel);
}
};
}
}
如果我从 rabbitmq 管理面板向队列发送消息,我会看到日志:
AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]
此处应接收事件的聚合:
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@CommandHandler
public OrderAggregate(final PlaceOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
}
@CommandHandler
public void handle(final ConfirmOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(final ShipOrderCommand command) {
log.debug("command: {}", command);
if (!orderConfirmed) {
throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
}
apply(new OrderShippedEvent(orderId));
}
@EventSourcingHandler
public void on(final OrderPlacedEvent event) {
log.debug("event: {}", event);
this.orderId = event.getOrderId();
orderConfirmed = false;
}
@EventSourcingHandler
public void on(final OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
@EventSourcingHandler
public void on(final OrderShippedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
protected OrderAggregate() {
}
}
所以问题是系统收到消息但没有触发任何事件。消息的内容似乎无关紧要。无论我向队列发送什么,我只会从我的 onMessage 方法中收到一条日志消息。
SpringAMQPMessageSource 的 JavaDoc 是这样说的:
/**
* MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
* <p>
* The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
* to all subscribed processors.
* <p>
* Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
* be consumed from the AMQP Queue without any processor processing them.
*
* @author Allard Buijze
* @since 3.0
*/
但是到现在我都不知道在哪里注册,怎么注册。
我的配置中的 axon.eventhandling 条目和聚合中的 @ProcessingGroup("amqpEvents") 已经来自测试。但是有没有这些条目根本没有区别。也试过没有 mode=subscribing.
确切版本:Spring Boot 2.1.4,Axon 4.1.1,axon-amqp-spring-boot-autoconfigure 4.1
非常感谢任何帮助或提示。
2019 年 4 月 23 日更新:
我试过这样写自己的class:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {
private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
private final AMQPMessageConverter messageConverter;
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
eventProcessors.add(messageProcessor);
log.debug("subscribe to: {}", messageProcessor);
return () -> eventProcessors.remove(messageProcessor);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
log.debug("eventProcessors: {}", eventProcessors);
if (!eventProcessors.isEmpty()) {
messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
.ifPresent(event -> eventProcessors.forEach(
ep -> ep.accept(Collections.singletonList(event))
));
}
}
}
结果相同,日志现在证明 eventProcessors 只是空的。
eventProcessors: []
所以问题是,如何正确注册事件处理器。有没有办法用 spring 正确地做到这一点?
更新 2:
也没有运气:
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
try {
final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
eventProcessorsField.setAccessible(true);
final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
log.debug("eventProcessors: {}", eventProcessors);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
除上述之外,以编程方式注册它也没有帮助:
@Autowired
void configure(EventProcessingModule epm,
RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
}
当然,@ProcessingGroup("amqpEvents") 在我的 class 中就位,其中包含 @EventSourcingHandler 注释方法。
更新 25.4.19:
查看 Allard 接受的答案。非常感谢指出我犯的错误:我错过了 EventSourcingHandler 不接收来自外部的消息。这是为了预测。不用于分发聚合! 上升
这里 config/classes 现在正在从 rabbitmq 接收事件:
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {
private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();
@EventHandler
public void on(OrderPlacedEvent event) {
log.debug("event: {}", event);
String orderId = event.getOrderId();
orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
}
@EventHandler
public void on(OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderConfirmed();
return orderedProduct;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderShipped();
return orderedProduct;
});
}
@QueryHandler
public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
log.debug("query: {}", query);
return new ArrayList<>(orderedProducts.values());
}
}
我当然从聚合中删除了@ProcessingGroup。
我的日志:
RabbitMQSpringAMQPMessageSource : received message: ...
OrderedProductsEventHandler : event: OrderShippedEvent...
在 Axon 中,聚合不接收来自 "outside" 的事件。聚合中的事件处理程序(更具体地说,它们是 EventSourcingHandlers)仅处理由同一聚合实例发布的事件,以便它可以重建其先前状态。
只有外部事件处理程序(例如更新投影的事件处理程序)才会从外部源接收事件。
为此,您的 application.yml 应该提及 bean 名称作为处理器的来源,而不是队列名称。所以在你的第一个例子中:
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
应该变成:
eventhandling:
processors:
amqpEvents:
source: inputMessageSource
mode: subscribing
但同样,这仅适用于在组件上定义的事件处理程序,不适用于聚合。
我正在尝试通过 rabbitmq 将消息发送到基于 axon4 spring 引导的系统。收到消息但未触发任何事件。我很确定我遗漏了一个重要的部分,但直到现在我还没有弄明白。
这里是我的相关部分application.yml
axon:
amqp:
exchange: axon.fanout
transaction-mode: publisher_ack
# adding the following lines changed nothing
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
spring:
rabbitmq:
username: rabbit
password: rabbit
从文档中我发现我应该创建一个 SpringAMQPMessageSource bean:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class AxonConfig {
@Bean
SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@RabbitListener(queues = "in.queue")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
super.onMessage(message, channel);
}
};
}
}
如果我从 rabbitmq 管理面板向队列发送消息,我会看到日志:
AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]
此处应接收事件的聚合:
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@CommandHandler
public OrderAggregate(final PlaceOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
}
@CommandHandler
public void handle(final ConfirmOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(final ShipOrderCommand command) {
log.debug("command: {}", command);
if (!orderConfirmed) {
throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
}
apply(new OrderShippedEvent(orderId));
}
@EventSourcingHandler
public void on(final OrderPlacedEvent event) {
log.debug("event: {}", event);
this.orderId = event.getOrderId();
orderConfirmed = false;
}
@EventSourcingHandler
public void on(final OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
@EventSourcingHandler
public void on(final OrderShippedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
protected OrderAggregate() {
}
}
所以问题是系统收到消息但没有触发任何事件。消息的内容似乎无关紧要。无论我向队列发送什么,我只会从我的 onMessage 方法中收到一条日志消息。
SpringAMQPMessageSource 的 JavaDoc 是这样说的:
/**
* MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
* <p>
* The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
* to all subscribed processors.
* <p>
* Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
* be consumed from the AMQP Queue without any processor processing them.
*
* @author Allard Buijze
* @since 3.0
*/
但是到现在我都不知道在哪里注册,怎么注册。
我的配置中的 axon.eventhandling 条目和聚合中的 @ProcessingGroup("amqpEvents") 已经来自测试。但是有没有这些条目根本没有区别。也试过没有 mode=subscribing.
确切版本:Spring Boot 2.1.4,Axon 4.1.1,axon-amqp-spring-boot-autoconfigure 4.1
非常感谢任何帮助或提示。
2019 年 4 月 23 日更新:
我试过这样写自己的class:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {
private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
private final AMQPMessageConverter messageConverter;
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
eventProcessors.add(messageProcessor);
log.debug("subscribe to: {}", messageProcessor);
return () -> eventProcessors.remove(messageProcessor);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
log.debug("eventProcessors: {}", eventProcessors);
if (!eventProcessors.isEmpty()) {
messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
.ifPresent(event -> eventProcessors.forEach(
ep -> ep.accept(Collections.singletonList(event))
));
}
}
}
结果相同,日志现在证明 eventProcessors 只是空的。
eventProcessors: []
所以问题是,如何正确注册事件处理器。有没有办法用 spring 正确地做到这一点?
更新 2:
也没有运气:
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
try {
final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
eventProcessorsField.setAccessible(true);
final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
log.debug("eventProcessors: {}", eventProcessors);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
除上述之外,以编程方式注册它也没有帮助:
@Autowired
void configure(EventProcessingModule epm,
RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
}
当然,@ProcessingGroup("amqpEvents") 在我的 class 中就位,其中包含 @EventSourcingHandler 注释方法。
更新 25.4.19:
查看 Allard 接受的答案。非常感谢指出我犯的错误:我错过了 EventSourcingHandler 不接收来自外部的消息。这是为了预测。不用于分发聚合! 上升 这里 config/classes 现在正在从 rabbitmq 接收事件:
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {
private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();
@EventHandler
public void on(OrderPlacedEvent event) {
log.debug("event: {}", event);
String orderId = event.getOrderId();
orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
}
@EventHandler
public void on(OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderConfirmed();
return orderedProduct;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderShipped();
return orderedProduct;
});
}
@QueryHandler
public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
log.debug("query: {}", query);
return new ArrayList<>(orderedProducts.values());
}
}
我当然从聚合中删除了@ProcessingGroup。
我的日志:
RabbitMQSpringAMQPMessageSource : received message: ...
OrderedProductsEventHandler : event: OrderShippedEvent...
在 Axon 中,聚合不接收来自 "outside" 的事件。聚合中的事件处理程序(更具体地说,它们是 EventSourcingHandlers)仅处理由同一聚合实例发布的事件,以便它可以重建其先前状态。
只有外部事件处理程序(例如更新投影的事件处理程序)才会从外部源接收事件。
为此,您的 application.yml 应该提及 bean 名称作为处理器的来源,而不是队列名称。所以在你的第一个例子中:
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
应该变成:
eventhandling:
processors:
amqpEvents:
source: inputMessageSource
mode: subscribing
但同样,这仅适用于在组件上定义的事件处理程序,不适用于聚合。