Axon Kafka 集成 - 使用自定义 Kafka ProducerFactory
Axon Kafka Integration - Using a custom Kafka ProducerFactory
我正在使用来自 GitHub 的 Kafka 示例(https://github.com/marinkobabic/axon-kafka-example) and it's working when connecting to a locally deployed Kafka cluster. As suggested by Axon's docs,我正在尝试通过公开一个 KafkaPublisher bean 并覆盖 withProducerFactory()。我的 bean 已创建,但是 createProducer() 我的自定义 kafka ProducerFactory 永远不会被调用。事实上,Axon 的 DefaultProducerFactory 仍在使用。有什么建议吗?
@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(ProducerFactory factory) {
KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
.withMessageSource(new SimpleEventBus())
.withProducerFactory(new org.axonframework.kafka.eventhandling.producer.ProducerFactory<String, byte[]>() {
@Override
public Producer<String, byte[]> createProducer() {
return factory.createProducer();
}
@Override
public void shutDown() {
}
})
.withTopic(topic)
.build();
KafkaPublisher<byte[], byte[]> publisher = new KafkaPublisher<>(configuration);
publisher.start();
return publisher;
}
我能够通过保留 KafkaPublisher 来解决我的问题,但我需要公开一个 EventBus bean(我在Sender.java) 并将发布者连接到它。我还需要暴露我自己的轴突 ProducerFactory,它包裹着我自己的 kafka ProducerFactory。我的最终配置如下:
@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
class AxonConfig {
@Value("${axon.kafka.default-topic}")
private String topic;
@Bean
public org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]> producerFactory(ProducerFactory factory) {
return new org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]>() {
@Override
public Producer<byte[], byte[]> createProducer() {
return factory.createProducer();
}
@Override
public void shutDown() {
}
};
}
@Bean("event-bus")
EventBus eventBus() {
return new SimpleEventBus();
}
@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(org.axonframework.kafka.eventhandling.producer.ProducerFactory factory, EventBus eventBus) {
KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
.withMessageSource(eventBus)
.withProducerFactory(factory)
.withTopic(topic)
.build();
MyKafkaPublisher<byte[], byte[]> publisher = new MyKafkaPublisher<>(configuration);
eventBus.subscribe((events) -> publisher.sendEvents(events));
publisher.start();
return publisher;
}
}
我正在使用来自 GitHub 的 Kafka 示例(https://github.com/marinkobabic/axon-kafka-example) and it's working when connecting to a locally deployed Kafka cluster. As suggested by Axon's docs,我正在尝试通过公开一个 KafkaPublisher bean 并覆盖 withProducerFactory()。我的 bean 已创建,但是 createProducer() 我的自定义 kafka ProducerFactory 永远不会被调用。事实上,Axon 的 DefaultProducerFactory 仍在使用。有什么建议吗?
@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(ProducerFactory factory) {
KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
.withMessageSource(new SimpleEventBus())
.withProducerFactory(new org.axonframework.kafka.eventhandling.producer.ProducerFactory<String, byte[]>() {
@Override
public Producer<String, byte[]> createProducer() {
return factory.createProducer();
}
@Override
public void shutDown() {
}
})
.withTopic(topic)
.build();
KafkaPublisher<byte[], byte[]> publisher = new KafkaPublisher<>(configuration);
publisher.start();
return publisher;
}
我能够通过保留 KafkaPublisher 来解决我的问题,但我需要公开一个 EventBus bean(我在Sender.java) 并将发布者连接到它。我还需要暴露我自己的轴突 ProducerFactory,它包裹着我自己的 kafka ProducerFactory。我的最终配置如下:
@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
class AxonConfig {
@Value("${axon.kafka.default-topic}")
private String topic;
@Bean
public org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]> producerFactory(ProducerFactory factory) {
return new org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]>() {
@Override
public Producer<byte[], byte[]> createProducer() {
return factory.createProducer();
}
@Override
public void shutDown() {
}
};
}
@Bean("event-bus")
EventBus eventBus() {
return new SimpleEventBus();
}
@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(org.axonframework.kafka.eventhandling.producer.ProducerFactory factory, EventBus eventBus) {
KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
.withMessageSource(eventBus)
.withProducerFactory(factory)
.withTopic(topic)
.build();
MyKafkaPublisher<byte[], byte[]> publisher = new MyKafkaPublisher<>(configuration);
eventBus.subscribe((events) -> publisher.sendEvents(events));
publisher.start();
return publisher;
}
}