KafkaHandler for multiple JSON types in Single Kafka Topic - Process in batch
KafkaHandler for multiple JSON types in Single Kafka Topic - Process in batch
TL/DR;是否可以在批处理模式下为不同的 JSON 类型使用单独的 KafkaHandlers?
我正在使用一个主题,其中包含多个不同的 JSON 消息。我正在处理数据并将其插入数据库,因此我正在批量处理这些数据,并在所有内容都已插入数据库后手动执行 Kafka 提交。
所以我有我的工厂,其中包括
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
和出厂设置
factory.setBatchListener(true);
为了测试,我有两个 POJO,它们是带有字符串 var1 和 var2 的 TypeA 和带有字符串(var3 和 var4)的 TypeB。然后我的消费者有一个方法,本质上是:
@KafkaListener(topics = "${kafka.topics}");
public void receive (List<Object> data, Acknowledgement ack) {
for (int i = 0; i < data.size(); i++) {
Object d = data.get(i);
if (d instanceof TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
if (d instancef TypeB) {
LOGGER.info("We have Type B - '{}' - '{}'", b.getVar3(), a.getVar4());
}
}
ack.acknowledge();
}
这行得通,但我一直在尝试使用 KafkaHandlers 为每种类型工作,而不是使用 instanceof。
如果我删除启用批处理的行并将 KafkaListener
注释移动到 class 级别,我就可以创建单独的处理程序
@KafkaHandler
public void receiveA(@Payload TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
@KafkaHandler
public void receiveB(@Payload TypeA) {
LOGGER.info("We have Type B - '{}' - '{}'", a.getVar3(), a.getVar4());
}
这很好用,但我失去了批处理能力。
如果我启用批处理模式,那么它似乎只需要 ArrayList 的处理程序,而您不能为不同的类型设置单独的处理程序。
这里有中间立场吗?有没有什么方法可以使用 KafkaHandler
处理单个记录,但是一旦所有记录都被它们的处理程序处理过(以处理确认和数据库提交)就会触发一些东西,或者有比这更好的处理方法吗?在第一个代码中使用 lots if instance of statements?
@KafkaHandler
目前不支持批量监听; 请打开一个GitHub issue - 我们应该能够从泛型参数类型中正确检测出泛型列表内容类型。
您可以使用自定义 BatchToRecordAdapter
来调用记录级侦听器,并在批处理的最后一条消息中设置一些标志以表明它是最后一条消息。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions-batch
编辑
支持 @KafkaHandler
没有意义 - 批处理可能包含混合类型。
TL/DR;是否可以在批处理模式下为不同的 JSON 类型使用单独的 KafkaHandlers?
我正在使用一个主题,其中包含多个不同的 JSON 消息。我正在处理数据并将其插入数据库,因此我正在批量处理这些数据,并在所有内容都已插入数据库后手动执行 Kafka 提交。
所以我有我的工厂,其中包括
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
和出厂设置
factory.setBatchListener(true);
为了测试,我有两个 POJO,它们是带有字符串 var1 和 var2 的 TypeA 和带有字符串(var3 和 var4)的 TypeB。然后我的消费者有一个方法,本质上是:
@KafkaListener(topics = "${kafka.topics}");
public void receive (List<Object> data, Acknowledgement ack) {
for (int i = 0; i < data.size(); i++) {
Object d = data.get(i);
if (d instanceof TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
if (d instancef TypeB) {
LOGGER.info("We have Type B - '{}' - '{}'", b.getVar3(), a.getVar4());
}
}
ack.acknowledge();
}
这行得通,但我一直在尝试使用 KafkaHandlers 为每种类型工作,而不是使用 instanceof。
如果我删除启用批处理的行并将 KafkaListener
注释移动到 class 级别,我就可以创建单独的处理程序
@KafkaHandler
public void receiveA(@Payload TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
@KafkaHandler
public void receiveB(@Payload TypeA) {
LOGGER.info("We have Type B - '{}' - '{}'", a.getVar3(), a.getVar4());
}
这很好用,但我失去了批处理能力。
如果我启用批处理模式,那么它似乎只需要 ArrayList 的处理程序,而您不能为不同的类型设置单独的处理程序。
这里有中间立场吗?有没有什么方法可以使用 KafkaHandler
处理单个记录,但是一旦所有记录都被它们的处理程序处理过(以处理确认和数据库提交)就会触发一些东西,或者有比这更好的处理方法吗?在第一个代码中使用 lots if instance of statements?
@KafkaHandler
目前不支持批量监听; 请打开一个GitHub issue - 我们应该能够从泛型参数类型中正确检测出泛型列表内容类型。
您可以使用自定义 BatchToRecordAdapter
来调用记录级侦听器,并在批处理的最后一条消息中设置一些标志以表明它是最后一条消息。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions-batch
编辑
支持 @KafkaHandler
没有意义 - 批处理可能包含混合类型。