如何在 SCDF 任务中使用 KafkaItemReader 获取完整消息?
How to get full Message with KafkaItemReader in SCDF Task?
我正在尝试创建一个 SCDF Task
来处理错误,但我无法弄清楚如何使用有效负载和 headers.
获取完整的 kafka 消息
我的想法是在服务未响应时将消息路由到我的流中的 DLQ。例如,某些 HTTP 服务已关闭并且 httclient 应用程序失败。
当 HTTP 服务恢复后,我想 运行 一个任务,它接收 DLQ 中的消息并将它们重新发送到正确的 Kafka 主题,无论消息是什么。
我正在尝试执行通用任务,因此 DLQ 和目标主题是 Kafka 消费者和生产者属性。
我也想使用通用 org.springframework.messaging.Message
。
当我使用 KafkaItemReader<String, String>
和 KafkaItemWriter<String, String>
时,它可以正常工作,只有作为字符串的有效负载,但所有 headers 都丢失了。当我使用 KafkaItemReader<String, Message<?>>
和 KafkaItemWriter<String, Message<?>>
也得到 headers 时,我有一个 ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472446462+01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712+01:00 stdout F at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
有办法吗?
事实上,似乎无法通过 KafkaItemReader
和 KafkaItemWriter
获取消息 headers。 Serializer/deserializer 用于密钥和有效负载,但我找不到获取 headers 的方法。
我通过使用 Tasklet
而不是 KafkaItemReader
和 KafkaItemWriter
解决了这个问题。在我的 Tasklet 中,我使用 KafkaConsumer
和 KafkaProducer
来处理 ConsumerRecord
和 ProducerRecord
这允许我复制 headers.
此外,我可以更恰当地处理提交(无自动提交):只有当消息由生产者发送时,消费者偏移才会提交。
我正在尝试创建一个 SCDF Task
来处理错误,但我无法弄清楚如何使用有效负载和 headers.
我的想法是在服务未响应时将消息路由到我的流中的 DLQ。例如,某些 HTTP 服务已关闭并且 httclient 应用程序失败。
当 HTTP 服务恢复后,我想 运行 一个任务,它接收 DLQ 中的消息并将它们重新发送到正确的 Kafka 主题,无论消息是什么。
我正在尝试执行通用任务,因此 DLQ 和目标主题是 Kafka 消费者和生产者属性。
我也想使用通用 org.springframework.messaging.Message
。
当我使用 KafkaItemReader<String, String>
和 KafkaItemWriter<String, String>
时,它可以正常工作,只有作为字符串的有效负载,但所有 headers 都丢失了。当我使用 KafkaItemReader<String, Message<?>>
和 KafkaItemWriter<String, Message<?>>
也得到 headers 时,我有一个 ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472446462+01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712+01:00 stdout F at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
有办法吗?
事实上,似乎无法通过 KafkaItemReader
和 KafkaItemWriter
获取消息 headers。 Serializer/deserializer 用于密钥和有效负载,但我找不到获取 headers 的方法。
我通过使用 Tasklet
而不是 KafkaItemReader
和 KafkaItemWriter
解决了这个问题。在我的 Tasklet 中,我使用 KafkaConsumer
和 KafkaProducer
来处理 ConsumerRecord
和 ProducerRecord
这允许我复制 headers.
此外,我可以更恰当地处理提交(无自动提交):只有当消息由生产者发送时,消费者偏移才会提交。