具有多个回复模板的 Kafka 侦听器容器
Kafka listener container with multiple reply templates
我有一个带有两个已注册处理程序的 Kafka 侦听器,每个处理程序都在侦听关于同一主题但具有不同模式类型的消息。侦听器使用 @SendTo
注释将结果转发到另一个主题,EOS 由 @Transactional
.
启用
@KafkaListener(
groupId = "groupId",
clientIdPrefix = "kafka-async-api-commands-listener",
topics = "cmd-topic",
containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<FooResponse> Foo(FooCommand command) {
FooResponse response = new FooResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<BarResponse> calculateBar(BarCommand command) {
BarResponse response = new BarResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
}
根据 docs:
In order to support @SendTo, the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply.
KafkaTemplate 是一种参数化类型,需要提供将要生成的消息的键和值。我在想出同时支持 FooResponse
和 BarResponse
作为消息类型的模板时遇到问题。而且这似乎是必须的,因为容器工厂只接受一个模板。
由于两个模板都需要共享整个配置基础(属性、错误处理程序),我可以实例化一个类型为 KafkaTemplate<UUID, Object>
的模板,该模板能够生成具有 FooResponse
和 [=16= 的消息] 序列化body并注入容器工厂:
@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {
private final String bootstrapServers;
public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean("asyncApiResponseTemplate")
public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
}
private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {
private final String bootstrapServers;
private final KafkaTemplate<UUID, Object> kafkaTemplate;
public KafkaAsyncApiCommandsListenerContainerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
) {
super();
this.bootstrapServers = bootstrapServers;
this.kafkaTemplate = kafkaTemplate;
this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
this.setReplyTemplate(kafkaTemplate);
}
private Map<String, Object> consumerConfig() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
put(JsonDeserializer.TRUSTED_PACKAGES, "*");
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}};
}
}
我对我的方法不满意,是否是因为 hacky 通过使用 Object
作为绕过 KafkaTemplate
静态类型的方法value 参数或由于将来可能出现的潜在配置不灵活。假设我想为 FooResponse
和 BarResponse
消息提供截然不同的配置模板。差异将需要不同的生产者配置。使用具有包罗万象的 Object
值类型的单个模板,这是不可能实现的。
有什么方法可以提供多个回复模板给监听器容器,让监听器容器根据消息值类型动态选择? Spring 引导自动配置是否试图以某种方式解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许只能通过实例化两个单独的侦听器容器(附加不同的回复模板)侦听同一主题来满足此要求?如果后一种方法是正确的,我如何确保跨多个侦听器容器正确传递消息(最好使用恰好一次语义)?
Is there any way to provide more than one replying templates to the listener container to be chosen dynamically based on message value type?
没有
Does Spring Boot auto-configuration somehow attempt to address this kind of use case?
没有
然而,当 return 类型为 Message<?>
时,通用参数无关紧要。参见
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
泛型类型仅在使用采用键 and/or 值的发送方法时才有意义。
您可以创建 KafkaTemplate
的子类并重写发送方法以调用具有适当泛型类型的委托模板,但这将毫无意义,除非您 return FooResponse
或直接 BarResponse
,而不是将它们组装成 Message<?>
.
我有一个带有两个已注册处理程序的 Kafka 侦听器,每个处理程序都在侦听关于同一主题但具有不同模式类型的消息。侦听器使用 @SendTo
注释将结果转发到另一个主题,EOS 由 @Transactional
.
@KafkaListener(
groupId = "groupId",
clientIdPrefix = "kafka-async-api-commands-listener",
topics = "cmd-topic",
containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<FooResponse> Foo(FooCommand command) {
FooResponse response = new FooResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<BarResponse> calculateBar(BarCommand command) {
BarResponse response = new BarResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
}
根据 docs:
In order to support @SendTo, the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply.
KafkaTemplate 是一种参数化类型,需要提供将要生成的消息的键和值。我在想出同时支持 FooResponse
和 BarResponse
作为消息类型的模板时遇到问题。而且这似乎是必须的,因为容器工厂只接受一个模板。
由于两个模板都需要共享整个配置基础(属性、错误处理程序),我可以实例化一个类型为 KafkaTemplate<UUID, Object>
的模板,该模板能够生成具有 FooResponse
和 [=16= 的消息] 序列化body并注入容器工厂:
@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {
private final String bootstrapServers;
public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean("asyncApiResponseTemplate")
public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
}
private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {
private final String bootstrapServers;
private final KafkaTemplate<UUID, Object> kafkaTemplate;
public KafkaAsyncApiCommandsListenerContainerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
) {
super();
this.bootstrapServers = bootstrapServers;
this.kafkaTemplate = kafkaTemplate;
this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
this.setReplyTemplate(kafkaTemplate);
}
private Map<String, Object> consumerConfig() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
put(JsonDeserializer.TRUSTED_PACKAGES, "*");
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}};
}
}
我对我的方法不满意,是否是因为 hacky 通过使用 Object
作为绕过 KafkaTemplate
静态类型的方法value 参数或由于将来可能出现的潜在配置不灵活。假设我想为 FooResponse
和 BarResponse
消息提供截然不同的配置模板。差异将需要不同的生产者配置。使用具有包罗万象的 Object
值类型的单个模板,这是不可能实现的。
有什么方法可以提供多个回复模板给监听器容器,让监听器容器根据消息值类型动态选择? Spring 引导自动配置是否试图以某种方式解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许只能通过实例化两个单独的侦听器容器(附加不同的回复模板)侦听同一主题来满足此要求?如果后一种方法是正确的,我如何确保跨多个侦听器容器正确传递消息(最好使用恰好一次语义)?
Is there any way to provide more than one replying templates to the listener container to be chosen dynamically based on message value type?
没有
Does Spring Boot auto-configuration somehow attempt to address this kind of use case?
没有
然而,当 return 类型为 Message<?>
时,通用参数无关紧要。参见
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
泛型类型仅在使用采用键 and/or 值的发送方法时才有意义。
您可以创建 KafkaTemplate
的子类并重写发送方法以调用具有适当泛型类型的委托模板,但这将毫无意义,除非您 return FooResponse
或直接 BarResponse
,而不是将它们组装成 Message<?>
.