Spring Cloud Stream:如何为 Kafka 主题的每个分区发送消息

Spring Cloud Stream: how to send a message for each partition of a Kafka topic

我正在尝试实现一种机制来标记一系列 kafka 消息的结束。 出于这个原因,我试图在每个主题分区的消息序列末尾发送一条特殊消息(一种 EOF)。如果消费者读取了所有 EOF 消息,则意味着他已完成对整个消息序列的处理。 有没有办法为主题的每个分区发送一条kafka消息?

编辑: 主题名称是在运行时发现的,因此分区数未知,但必须以某种方式检索。

发送 Message<?>PARTITION_ID header。

MessageBuilder.withPayload(...)
    .setHeader(KafkaHeaders.PARTITION_ID, partition)
    .build();

或者使用 partitionKeyExpression

配置生产者

https://docs.spring.io/spring-cloud-stream/docs/3.2.2/reference/html/spring-cloud-stream.html#_producer_properties