在 spring-cloud-streams 中批量处理传入的有效负载未按预期工作

Processing incoming payloads as batch not working as expected in spring-cloud-streams

我说 'not working as expected' 但实际上更像是“我真的不知道我是否在这里做适当的工作”,我觉得我正在混合不同方法的东西,但不是'真的没有关联。

现在我一直在使用 Spring Cloud Streams 来处理来自 PubSub 订阅的字符串类型的消息,到目前为止一切顺利,消息输入消息输出没有太多麻烦。

我现在想要实现的是收集,比方说,1000 条消息处理它们并将它们一起发送到另一个 PubSub 主题。仍然不确定是将它们作为列表发送还是像现在一样单独发送,但同时发送(尽管这不应该与这个问题相关)。

现在我刚刚发现了以下内容属性。

spring.cloud.stream.bindings.input.consumer.batch-mode=true

连同以下更具体的 GCP 内容。

spring.cloud.gcp.pubsub.publisher.batching.enabled=true
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds=300
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold=100

所以第一个问题是... 它们有任何联系吗?我必须把第一个和其他三个放在一起吗?

我将之前的属性添加到我的 application.properties 文件后发生的事情实际上没有任何变化。消息不断到达和离开应用程序,没有任何问题,也没有任何批处理方法。

目前正在按以下方式使用功能特性。

    @Bean
    public Function<Message<String>, String> sampleFunction() {

       ... // Stream processing in here

       return processedString;

    }

我原以为这会因某些消息而崩溃,因为该方法只接收一个字符串,而不是一个字符串列表。由于它没有崩溃,我修改了上面的方法以接收一个 String 列表(也许 Spring 在幕后做了一些魔术仍然以 String 形式接收消息,但将它们收集在一个列表中供方法稍后处理? ).

    @Bean
    public Function<Message<List<String>>, String> sampleFunction() {

       ... // Stream processing in here

       return processedString;

    }

但这只是崩溃,因为它试图将单个字符串消息解析为字符串列表。

我如何准备代码以将所有这些字符串消息批处理到一个列表中?有这方面的例子吗?

...batch-mode 仅适用于支持它的绑定器(例如 Kafka、RabbitMQ)。它看起来不像 GCP 活页夹支持它(我没有看到对 属性 的引用)。

https://github.com/spring-cloud/spring-cloud-gcp/blob/master/spring-cloud-gcp-pubsub-stream-binder/src/main/java/org/springframework/cloud/gcp/stream/binder/pubsub/PubSubMessageChannelBinder.java

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_batch_consumers

发布者批处理与消费者批处理无关。