在 spring-cloud-streams 中批量处理传入的有效负载未按预期工作
Processing incoming payloads as batch not working as expected in spring-cloud-streams
我说 'not working as expected' 但实际上更像是“我真的不知道我是否在这里做适当的工作”,我觉得我正在混合不同方法的东西,但不是'真的没有关联。
现在我一直在使用 Spring Cloud Streams 来处理来自 PubSub 订阅的字符串类型的消息,到目前为止一切顺利,消息输入消息输出没有太多麻烦。
我现在想要实现的是收集,比方说,1000 条消息,处理它们并将它们一起发送到另一个 PubSub 主题。仍然不确定是将它们作为列表发送还是像现在一样单独发送,但同时发送(尽管这不应该与这个问题相关)。
现在我刚刚发现了以下内容属性。
spring.cloud.stream.bindings.input.consumer.batch-mode=true
连同以下更具体的 GCP 内容。
spring.cloud.gcp.pubsub.publisher.batching.enabled=true
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds=300
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold=100
所以第一个问题是... 它们有任何联系吗?我必须把第一个和其他三个放在一起吗?
我将之前的属性添加到我的 application.properties 文件后发生的事情实际上没有任何变化。消息不断到达和离开应用程序,没有任何问题,也没有任何批处理方法。
目前正在按以下方式使用功能特性。
@Bean
public Function<Message<String>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
我原以为这会因某些消息而崩溃,因为该方法只接收一个字符串,而不是一个字符串列表。由于它没有崩溃,我修改了上面的方法以接收一个 String 列表(也许 Spring 在幕后做了一些魔术仍然以 String 形式接收消息,但将它们收集在一个列表中供方法稍后处理? ).
@Bean
public Function<Message<List<String>>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
但这只是崩溃,因为它试图将单个字符串消息解析为字符串列表。
我如何准备代码以将所有这些字符串消息批处理到一个列表中?有这方面的例子吗?
...batch-mode
仅适用于支持它的绑定器(例如 Kafka、RabbitMQ)。它看起来不像 GCP 活页夹支持它(我没有看到对 属性 的引用)。
发布者批处理与消费者批处理无关。
我说 'not working as expected' 但实际上更像是“我真的不知道我是否在这里做适当的工作”,我觉得我正在混合不同方法的东西,但不是'真的没有关联。
现在我一直在使用 Spring Cloud Streams 来处理来自 PubSub 订阅的字符串类型的消息,到目前为止一切顺利,消息输入消息输出没有太多麻烦。
我现在想要实现的是收集,比方说,1000 条消息,处理它们并将它们一起发送到另一个 PubSub 主题。仍然不确定是将它们作为列表发送还是像现在一样单独发送,但同时发送(尽管这不应该与这个问题相关)。
现在我刚刚发现了以下内容属性。
spring.cloud.stream.bindings.input.consumer.batch-mode=true
连同以下更具体的 GCP 内容。
spring.cloud.gcp.pubsub.publisher.batching.enabled=true
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds=300
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold=100
所以第一个问题是... 它们有任何联系吗?我必须把第一个和其他三个放在一起吗?
我将之前的属性添加到我的 application.properties 文件后发生的事情实际上没有任何变化。消息不断到达和离开应用程序,没有任何问题,也没有任何批处理方法。
目前正在按以下方式使用功能特性。
@Bean
public Function<Message<String>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
我原以为这会因某些消息而崩溃,因为该方法只接收一个字符串,而不是一个字符串列表。由于它没有崩溃,我修改了上面的方法以接收一个 String 列表(也许 Spring 在幕后做了一些魔术仍然以 String 形式接收消息,但将它们收集在一个列表中供方法稍后处理? ).
@Bean
public Function<Message<List<String>>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
但这只是崩溃,因为它试图将单个字符串消息解析为字符串列表。
我如何准备代码以将所有这些字符串消息批处理到一个列表中?有这方面的例子吗?
...batch-mode
仅适用于支持它的绑定器(例如 Kafka、RabbitMQ)。它看起来不像 GCP 活页夹支持它(我没有看到对 属性 的引用)。
发布者批处理与消费者批处理无关。