使用 Spring Cloud Stream Kafka Binder 批量使用 Kafka 消息及其密钥
Consuming Kafka messages with its key in batches using Spring Cloud Stream Kafka Binder
在consuming them as batches时是否有可能以某种方式获取kafka消息的密钥?
我在使用 Message<String>
作为我的消费者函数的输入时设法访问了消息键,但这只适用于非批处理模式:
@SpringBootApplication
class KafkaSink {
private val log = logger()
@Bean
fun sink() : Consumer<Message<String>> {
return Consumer {
log.info("key: ${it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]} value: ${it.payload}")
}
}
}
当设置 属性 spring.cloud.stream.binding.sink.consumer.batch-mode=true
时,我只能使用 List<String>
作为消费者的参数,而不能使用 List<Message<String>>
使用Message<List<String>>
;然后,KafkaHeaders.RECEIVED_MESSAGE_KEY
是一个 List<?>
,与有效载荷的顺序相同 - 所有其他 headers.
也是如此
编辑
在下方重新评论;默认 content-type 是 application/json
。添加
bindings:
sink-in-0:
content-type: text/plain
修复你的测试。
不幸的是,它在任何地方都没有提到:-(,但是将输入事件包装到org.springframework.messaging.Message
中(例如获取记录密钥),这是批处理时正确的签名:
@Bean
public Consumer<Message<List<String>>> sink() {
return messages -> {
List<?> keys = messages.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class);
log.info("headers {}", messages.getHeaders());
keys.forEach(k -> log.info("key {}", k));
messages.getPayload().forEach(string -> log.info("processing {}", string));
};
}
您注意到签名是相反的:Message<List<?>>
而不是 List<Message<?>>
!
在consuming them as batches时是否有可能以某种方式获取kafka消息的密钥?
我在使用 Message<String>
作为我的消费者函数的输入时设法访问了消息键,但这只适用于非批处理模式:
@SpringBootApplication
class KafkaSink {
private val log = logger()
@Bean
fun sink() : Consumer<Message<String>> {
return Consumer {
log.info("key: ${it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]} value: ${it.payload}")
}
}
}
当设置 属性 spring.cloud.stream.binding.sink.consumer.batch-mode=true
时,我只能使用 List<String>
作为消费者的参数,而不能使用 List<Message<String>>
使用Message<List<String>>
;然后,KafkaHeaders.RECEIVED_MESSAGE_KEY
是一个 List<?>
,与有效载荷的顺序相同 - 所有其他 headers.
编辑
在下方重新评论;默认 content-type 是 application/json
。添加
bindings:
sink-in-0:
content-type: text/plain
修复你的测试。
不幸的是,它在任何地方都没有提到:-(,但是将输入事件包装到org.springframework.messaging.Message
中(例如获取记录密钥),这是批处理时正确的签名:
@Bean
public Consumer<Message<List<String>>> sink() {
return messages -> {
List<?> keys = messages.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class);
log.info("headers {}", messages.getHeaders());
keys.forEach(k -> log.info("key {}", k));
messages.getPayload().forEach(string -> log.info("processing {}", string));
};
}
您注意到签名是相反的:Message<List<?>>
而不是 List<Message<?>>
!