有没有办法在 spring-kafka 中配置自定义 RecordMessageConverter?
Is there a way to configure custom RecordMessageConverter in spring-kafka?
我创建了一个自定义 RecordMessageConverter
,它在 toMessage
和 fromMessage
中打印(我按照 https://www.confluent.io/blog/spring-for-apache-kafka-deep-dive-part-1-error-handling-message-conversion-transaction-support 中的说明创建了 Bean),但不幸的是它没有绑定到听众。我可以显式绑定它还是我应该做其他事情让它起作用?
public class MobileMessageConverter implements RecordMessageConverter {
@Override
public Message<?> toMessage(ConsumerRecord<?,?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type payloadType) {
System.out.println("Converting toMessage");
return null;
}
@Override
public ProducerRecord<CustomKey, CustomValue> fromMessage(Message<?> message, String defaultTopic) {
System.out.println("Converting fromMessage");
return new ProducerRecord("UNKNOWN",new CustomValue());
}
}
@RestController
public class CustomKafkaController {
@Bean
public RecordMessageConverter converter() {
return new CustomMessageConverter();
}
@KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerContainerFactory", groupId = "1")
public void listenAsObject(ConsumerRecord<CustomKey, CustomValue> cr) {
System.out.println(cr.getValue().getCustomMessage());
}
}
我解决了!
- 我通过 setter.
将转换器添加到工厂 Bean (kafkaListenerContainerFactory
)
- 我更改了转换以实现
MessagingMessageConverter
而不是 RecordMessageConverter
并且我实现了方法 extractAndConvertValue
- (如果您想更改为其他类型) 删除
listenAsObject
并将其替换为 ConsumerRecordtype
您的转换为
例如:
如果要将自定义值更改为字符串:
创建新转换器:
public class CustomToStringMessageConverter extends MessagingMessageConverter {
@Override
protected String extractAndConvertValue(ConsumerRecord<?, ?> record, Type type){
return "EmptyString"
}
}
将侦听器更改为:
@KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerStringContainerFactory", groupId = "1")
public void listen2(String cr) {
**DO SOMETHING WITH THE CONVERTED CR**
}
最后一件事是通过以下方式获取转换后的 Bean 并将其设置在您的工厂中:
factory.setMessageConverter(new CustomToStringMessageConverter ());
我创建了一个自定义 RecordMessageConverter
,它在 toMessage
和 fromMessage
中打印(我按照 https://www.confluent.io/blog/spring-for-apache-kafka-deep-dive-part-1-error-handling-message-conversion-transaction-support 中的说明创建了 Bean),但不幸的是它没有绑定到听众。我可以显式绑定它还是我应该做其他事情让它起作用?
public class MobileMessageConverter implements RecordMessageConverter {
@Override
public Message<?> toMessage(ConsumerRecord<?,?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type payloadType) {
System.out.println("Converting toMessage");
return null;
}
@Override
public ProducerRecord<CustomKey, CustomValue> fromMessage(Message<?> message, String defaultTopic) {
System.out.println("Converting fromMessage");
return new ProducerRecord("UNKNOWN",new CustomValue());
}
}
@RestController
public class CustomKafkaController {
@Bean
public RecordMessageConverter converter() {
return new CustomMessageConverter();
}
@KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerContainerFactory", groupId = "1")
public void listenAsObject(ConsumerRecord<CustomKey, CustomValue> cr) {
System.out.println(cr.getValue().getCustomMessage());
}
}
我解决了!
- 我通过 setter.
- 我更改了转换以实现
MessagingMessageConverter
而不是RecordMessageConverter
并且我实现了方法extractAndConvertValue
- (如果您想更改为其他类型) 删除
listenAsObject
并将其替换为ConsumerRecordtype
您的转换为
kafkaListenerContainerFactory
)
例如:
如果要将自定义值更改为字符串:
创建新转换器:
public class CustomToStringMessageConverter extends MessagingMessageConverter {
@Override
protected String extractAndConvertValue(ConsumerRecord<?, ?> record, Type type){
return "EmptyString"
}
}
将侦听器更改为:
@KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerStringContainerFactory", groupId = "1")
public void listen2(String cr) {
**DO SOMETHING WITH THE CONVERTED CR**
}
最后一件事是通过以下方式获取转换后的 Bean 并将其设置在您的工厂中:
factory.setMessageConverter(new CustomToStringMessageConverter ());