从外部服务发送命令时,聚合中的 axon 3.4 CommandHandler 未被触发

axon 3.4 CommandHandler in Aggregate not being triggered when command is sent from external service

所以我试图了解 axon 3.4 中的分布式命令总线。 我有一个用例,当某个命令被发送时,聚合发送一个启动传奇的事件,这个传奇发送 2 个命令以保持数据发送到 2 个不同服务的状态一致。

现在是棘手的部分,CommandHandlers 是在外部服务中定义的,这些服务执行某些操作,然后将其中包含操作结果的命令发回。然而,当命令被发送时,我总是以超时异常结束,所以 CommandBus 知道哪个聚合必须处理它但不能将正确的聚合分配给命令。

目前 commandService.createCurrency 只记录一条消息,这就是为什么在事件处理程序中有一个 Thread.sleep,以模拟更长的 运行 过程。

下面是我的代码:

@Configuration
public class AxonConfig {

    @Autowired
    private Registration registration;

    private RestTemplate restTemplate = new RestTemplate();

    @Bean
    public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
                                                             Serializer serializer) {
        return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
    }

    @Bean
    public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
        return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
    }

    @Primary // to make sure this CommandBus implementation is used for autowiring
    @Bean
    public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
                                                                  CommandBusConnector commandBusConnector) {
        return new DistributedCommandBus(commandRouter, commandBusConnector);
    }

}

服务 1

合计:

@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public CreateCurrencyAggregate(CreateCurrencyCommand command) {
        log.info("starting create currency");
        Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
        Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
        this.id = command.getId();
        apply(CreateCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build());
    }

    @CommandHandler
    public void on(DalCreatedCommand command) {
        log.info("Currency created on dal layer");
        apply(DalCurrencyCreatedEvent.builder()
                .dalId(command.getId())
                .build());

    }
}

传奇:

@Slf4j
@Saga
public class CreateCurrencySaga {

    @Autowired
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void handle(CreateCurrencyEvent event) {
        log.info("starting saga...");
        dalCreated = false;
        as400Created = true;
        SagaLifecycle.associateWith("id", event.getId());
        SagaLifecycle.associateWith("dalId", event.getId());
        commandGateway.send(CreateDalCurrencyCommand.builder()
                .id(event.getId())
                .payload(event.getPayload())
                .build());
    }

    @SagaEventHandler(associationProperty = "dalId")
    public void handle(DalCurrencyCreatedEvent event) {
        log.info("receiving createdEvent");
        SagaLifecycle.end();
    }


}

服务 2

外部命令处理程序

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {

    @Autowired
    private EventBus eventBus;

    @CommandHandler
    public void on(CreateDalCurrencyCommand command) {
        eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build()));
    }
}

事件处理程序

@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {

    private final CurrencyCommandService commandService;

    private final CommandGateway commandGateway;

    @EventHandler
    public void handle(CreateDalCurrencyEvent event){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        commandService.createCurrency(event.getId(), event.getPayload());
        var result = commandGateway.send(DalCreatedCommand.builder()
            .id(event.getId())
            .build());
    }
}

我想我可以给你一些这方面的补充背景知识。

Spring 云用作发现服务的实施使世界变得不同,遗憾的是。 在内部,SpringCloudCommandRouter 使用 ServiceInstance 的元数据来共享 MessageRoutingInformation。连接到您的设置的每个应用程序都将由 ServiceInstance 表示,因此共享您作为服务可以通过这种方法处理的消息(以及命令)将很简单。

然而,当构建 SpringCloudCommandRouter 时,这是通过使用 Eureka 作为 Spring 云实现来测试的。 Eureka 允许调整 ServiceInstance 的元数据,因此我可以非常自信地说,如果您使用的是 Spring Cloud Eureka,我希望一切正常。

但是,如果您使用 Consul,那就是另一回事了。 Spring Cloud Consul 不允许调整 ServiceInstance 的元数据。我过去创建了一个 issue 来调整 API 以实际具体说明能够更新元数据。

无论如何,Axon Framework 已通过提供 SpringCloudHttpBackupCommandRouter.

因此,我建议将您的配置调整为使用 SpringCloudHttpBackupCommandRouter 而不是``SpringCloudCommandRouter`