Axon 4.3 - 使用来自主题 kafka 的消息

Axon 4.3 - consume message from topic kafka

我正在使用 Axon 版本 (4.3),它在 SpringBoot Main class using

中无缝支持带有注释的 Kafka
@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)

在我的例子中,消息成功存储在主题中,但是这个消费者的问题,我无法使用主题中的消息。

你是不是配置不全?

 <dependency>
   <groupId>org.axonframework</groupId>
   <artifactId>axon-spring-boot-starter</artifactId>
   <version>4.3</version>
   <exclusions>
     <exclusion>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-server-connector</artifactId>
     </exclusion>
   </exclusions>
 </dependency>
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
   <groupId>org.axonframework.extensions.kafka</groupId>
   <artifactId>axon-kafka-spring-boot-starter</artifactId>
   <version>4.0-RC3</version>
 </dependency>

application.yml

axon:
  eventhandling:
    processors:
      conventions:
        source: kafkaMessageSource
        mode: tracking
  serializer:
    general: jackson
  kafka:
    client-id: consumer_service
    default-topic: topic_x
    bootstrap-servers:
    - 127.0.0.1:9092

Listener.java

//@Component
@ProcessingGroup(value = "conventions")
public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);

 @EventHandler
 void on(ConventionCreatedEvent event) {
  LOGGER.info("got the event {}", event);
 }

}

我认为这是您配置中的 Kafka 消息源 的名称,它现在是 Aymen 的罪魁祸首。

当使用 Axon 的自动配置和 Axon-Kafka 自动配置而没有任何关于您想要哪种类型的 Kafka 消息源的具体信息时,将创建一个 StreamableKafkaMessageSource。该 bean 的名称将是 streamableKafkaMessageSource.

在您的 application.yml 中,您期望 conventions 跟踪事件处理器的 source 被称为 kafkaMessageSource

在此旁边,您可以查看 Axon 的 Kafka 扩展中包含的 example application。也许这让事情变得更清楚了。