使用 Spring 云流 Kafka Binder 在批处理功能中发布多条消息
Publish multiple messages in batch processing function with Spring cloud stream Kafka Binder
我正在寻找使用 spring 云流 kafka 活页夹(没有 Kafka Streams)创建函数式处理器的示例,它可以使用来自一个主题的一批 n 条消息并将 m 条消息发布到另一个主题(m < n)。
我尝试了以下方法:
public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
return ((batch) -> {
Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
.collect(Collectors.groupingBy(result -> IncomingEvent::key));
List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
aggregationResults.add(
MessageBuilder.withPayload(outgoingEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
.build()
);
}
return aggregationResults;
});
}
但是,这会生成一个带有消息数组的事件。我尝试将函数的 return 类型从 List 更改为 Flux 然后 returning Flux.fromIterable(aggregationResults),这似乎发布了多条消息,但是这些消息似乎是具有 scanAvailable 和 prefetch 属性的 Flux 的序列化实例,而不是实际的消息。
我在网上找不到任何实现此目的的示例。看到这样的例子会很有帮助。
我认为这不受支持;使用 Consumer<List<IncomingEvent>>
并使用 StreamBridge
发布出站消息。
编辑
看来我错了;参见 https://github.com/spring-cloud/spring-cloud-stream/issues/2143
This is a documentation request. The cloud stream framework supports an undocumented feature to publish a batch of messages using a method signature like public Function<Whatever, List<Message<POJO>>> myMethod()
. This results in each message in the list being published individually by the binder.
如果它不适合你,我建议你对这个问题发表评论。
我正在寻找使用 spring 云流 kafka 活页夹(没有 Kafka Streams)创建函数式处理器的示例,它可以使用来自一个主题的一批 n 条消息并将 m 条消息发布到另一个主题(m < n)。 我尝试了以下方法:
public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
return ((batch) -> {
Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
.collect(Collectors.groupingBy(result -> IncomingEvent::key));
List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
aggregationResults.add(
MessageBuilder.withPayload(outgoingEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
.build()
);
}
return aggregationResults;
});
}
但是,这会生成一个带有消息数组的事件。我尝试将函数的 return 类型从 List
我认为这不受支持;使用 Consumer<List<IncomingEvent>>
并使用 StreamBridge
发布出站消息。
编辑
看来我错了;参见 https://github.com/spring-cloud/spring-cloud-stream/issues/2143
This is a documentation request. The cloud stream framework supports an undocumented feature to publish a batch of messages using a method signature like
public Function<Whatever, List<Message<POJO>>> myMethod()
. This results in each message in the list being published individually by the binder.
如果它不适合你,我建议你对这个问题发表评论。