Spring Cloud Stream:如何为 Kafka 主题的每个分区发送消息
Spring Cloud Stream: how to send a message for each partition of a Kafka topic
我正在尝试实现一种机制来标记一系列 kafka 消息的结束。
出于这个原因,我试图在每个主题分区的消息序列末尾发送一条特殊消息(一种 EOF)。如果消费者读取了所有 EOF 消息,则意味着他已完成对整个消息序列的处理。
有没有办法为主题的每个分区发送一条kafka消息?
编辑:
主题名称是在运行时发现的,因此分区数未知,但必须以某种方式检索。
发送 Message<?>
和 PARTITION_ID
header。
MessageBuilder.withPayload(...)
.setHeader(KafkaHeaders.PARTITION_ID, partition)
.build();
或者使用 partitionKeyExpression
配置生产者
我正在尝试实现一种机制来标记一系列 kafka 消息的结束。 出于这个原因,我试图在每个主题分区的消息序列末尾发送一条特殊消息(一种 EOF)。如果消费者读取了所有 EOF 消息,则意味着他已完成对整个消息序列的处理。 有没有办法为主题的每个分区发送一条kafka消息?
编辑: 主题名称是在运行时发现的,因此分区数未知,但必须以某种方式检索。
发送 Message<?>
和 PARTITION_ID
header。
MessageBuilder.withPayload(...)
.setHeader(KafkaHeaders.PARTITION_ID, partition)
.build();
或者使用 partitionKeyExpression