使用 Spring 云流 Kafka Binder 在批处理功能中发布多条消息

Publish multiple messages in batch processing function with Spring cloud stream Kafka Binder

我正在寻找使用 spring 云流 kafka 活页夹(没有 Kafka Streams)创建函数式处理器的示例,它可以使用来自一个主题的一批 n 条消息并将 m 条消息发布到另一个主题(m < n)。 我尝试了以下方法:

public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
    return ((batch) -> {
        Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
            .collect(Collectors.groupingBy(result -> IncomingEvent::key));
        List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
        for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
            OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
            aggregationResults.add(
                MessageBuilder.withPayload(outgoingEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
                .build() 
            );
        }
        return aggregationResults;
    });
}

但是,这会生成一个带有消息数组的事件。我尝试将函数的 return 类型从 List 更改为 Flux 然后 returning Flux.fromIterable(aggregationResults),这似乎发布了多条消息,但是这些消息似乎是具有 scanAvailable 和 prefetch 属性的 Flux 的序列化实例,而不是实际的消息。 我在网上找不到任何实现此目的的示例。看到这样的例子会很有帮助。

我认为这不受支持;使用 Consumer<List<IncomingEvent>> 并使用 StreamBridge 发布出站消息。

https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

编辑

看来我错了;参见 https://github.com/spring-cloud/spring-cloud-stream/issues/2143

This is a documentation request. The cloud stream framework supports an undocumented feature to publish a batch of messages using a method signature like public Function<Whatever, List<Message<POJO>>> myMethod(). This results in each message in the list being published individually by the binder.

如果它不适合你,我建议你对这个问题发表评论。