如何在 SCDF 任务中使用 KafkaItemReader 获取完整消息?

How to get full Message with KafkaItemReader in SCDF Task?

我正在尝试创建一个 SCDF Task 来处理错误,但我无法弄清楚如何使用有效负载和 headers.

获取完整的 kafka 消息

我的想法是在服务未响应时将消息路由到我的流中的 DLQ。例如,某些 HTTP 服务已关闭并且 httclient 应用程序失败。

当 HTTP 服务恢复后,我想 运行 一个任务,它接收 DLQ 中的消息并将它们重新发送到正确的 Kafka 主题,无论消息是什么。

我正在尝试执行通用任务,因此 DLQ 和目标主题是 Kafka 消费者和生产者属性。 我也想使用通用 org.springframework.messaging.Message

当我使用 KafkaItemReader<String, String>KafkaItemWriter<String, String> 时,它可以正常工作,只有作为字符串的有效负载,但所有 headers 都丢失了。当我使用 KafkaItemReader<String, Message<?>>KafkaItemWriter<String, Message<?>> 也得到 headers 时,我有一个 ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message

2020-11-13T14:27:03.472446462+01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493+01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463+01:00 stdout F     at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814+01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712+01:00 stdout F    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956+01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765+01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]

有办法吗?

事实上,似乎无法通过 KafkaItemReaderKafkaItemWriter 获取消息 headers。 Serializer/deserializer 用于密钥和有效负载,但我找不到获取 headers 的方法。

我通过使用 Tasklet 而不是 KafkaItemReaderKafkaItemWriter 解决了这个问题。在我的 Tasklet 中,我使用 KafkaConsumerKafkaProducer 来处理 ConsumerRecordProducerRecord 这允许我复制 headers.

此外,我可以更恰当地处理提交(无自动提交):只有当消息由生产者发送时,消费者偏移才会提交。