Axon 4.3 - 使用来自主题 kafka 的消息
Axon 4.3 - consume message from topic kafka
我正在使用 Axon 版本 (4.3),它在 SpringBoot Main class using
中无缝支持带有注释的 Kafka
@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)
在我的例子中,消息成功存储在主题中,但是这个消费者的问题,我无法使用主题中的消息。
你是不是配置不全?
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.3</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.0-RC3</version>
</dependency>
application.yml
axon:
eventhandling:
processors:
conventions:
source: kafkaMessageSource
mode: tracking
serializer:
general: jackson
kafka:
client-id: consumer_service
default-topic: topic_x
bootstrap-servers:
- 127.0.0.1:9092
Listener.java
//@Component
@ProcessingGroup(value = "conventions")
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);
@EventHandler
void on(ConventionCreatedEvent event) {
LOGGER.info("got the event {}", event);
}
}
我认为这是您配置中的 Kafka 消息源 的名称,它现在是 Aymen 的罪魁祸首。
当使用 Axon 的自动配置和 Axon-Kafka 自动配置而没有任何关于您想要哪种类型的 Kafka 消息源的具体信息时,将创建一个 StreamableKafkaMessageSource
。该 bean 的名称将是 streamableKafkaMessageSource
.
在您的 application.yml
中,您期望 conventions
跟踪事件处理器的 source
被称为 kafkaMessageSource
。
在此旁边,您可以查看 Axon 的 Kafka 扩展中包含的 example application。也许这让事情变得更清楚了。
我正在使用 Axon 版本 (4.3),它在 SpringBoot Main class using
中无缝支持带有注释的 Kafka@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)
在我的例子中,消息成功存储在主题中,但是这个消费者的问题,我无法使用主题中的消息。
你是不是配置不全?
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.3</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.0-RC3</version>
</dependency>
application.yml
axon:
eventhandling:
processors:
conventions:
source: kafkaMessageSource
mode: tracking
serializer:
general: jackson
kafka:
client-id: consumer_service
default-topic: topic_x
bootstrap-servers:
- 127.0.0.1:9092
Listener.java
//@Component
@ProcessingGroup(value = "conventions")
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);
@EventHandler
void on(ConventionCreatedEvent event) {
LOGGER.info("got the event {}", event);
}
}
我认为这是您配置中的 Kafka 消息源 的名称,它现在是 Aymen 的罪魁祸首。
当使用 Axon 的自动配置和 Axon-Kafka 自动配置而没有任何关于您想要哪种类型的 Kafka 消息源的具体信息时,将创建一个 StreamableKafkaMessageSource
。该 bean 的名称将是 streamableKafkaMessageSource
.
在您的 application.yml
中,您期望 conventions
跟踪事件处理器的 source
被称为 kafkaMessageSource
。
在此旁边,您可以查看 Axon 的 Kafka 扩展中包含的 example application。也许这让事情变得更清楚了。