一个 SCDF 来源,2 个处理者,但每个项目只有 1 个处理者
one SCDF source, 2 processors but only 1 processes each item
我的用例是这个的变体:
在示例中,1 个源向 rabbitmq 发出一个项目,两个 处理器都得到它。
我要相反的。我希望源向 rabbitmq 发送项目,但只有 1 个处理器处理每个项目。
假设我有:
1 个名为 source 的来源
2 个处理器,名为 processor1 和 processor2
所以源发出:A、B、C 到 rabbitmq
RabbitMQ 将发出 A
首先获得 A 的处理器将处理它 - 假设处理器 1 是幸运的并处理 A。
然后 RabbitMQ 将发出 B
由于处理器 1 忙于 A 而处理器 2 空闲处理器 2 处理 B
RabbitMQ 将发出 C
处理器 1 完成 A 并处于空闲状态,因此处理器 1 处理 C
我想出的Spring云数据流图是:
处理器A在上面,处理器B在下面
当我部署它并 运行 它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 接收 A、B,然后是 C
我很困惑,如果我想要的行为是我可以在 Spring 云数据流中实现的,或者是否有针对此的 RabbitMQ 设置,如消息删除的答案所暗示的那样
"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."
如果是这样,我可以在我的 Spring 云数据流源中设置它吗?或者它是 RabbitMQ 设置还是完全不同的东西
更新:
我已添加
spring.cloud.stream.bindings.input.group=consumerGroup
到我处理器的 application.properties 文件。
不幸的是,两个处理器都接收到完全相同的数据。
我是否需要在我的来源的 application.properties 中添加类似的条目?
我需要更改处理器上的注释吗?目前是:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
我是否需要以任何方式修改源上的注释?目前是:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
包含@Poller 是否以任何方式改变了这一点?
更新:
属性 的名字是 spring.cloud.stream.instanceCount 吗?
对于流式应用,您需要设置 ...consumer.group 属性 以便它们都在同一组中并竞争消息。
但这应该在 SCDF 中自动发生。
我的用例是这个的变体:
在示例中,1 个源向 rabbitmq 发出一个项目,两个 处理器都得到它。
我要相反的。我希望源向 rabbitmq 发送项目,但只有 1 个处理器处理每个项目。
假设我有:
1 个名为 source 的来源 2 个处理器,名为 processor1 和 processor2
所以源发出:A、B、C 到 rabbitmq
RabbitMQ 将发出 A
首先获得 A 的处理器将处理它 - 假设处理器 1 是幸运的并处理 A。
然后 RabbitMQ 将发出 B
由于处理器 1 忙于 A 而处理器 2 空闲处理器 2 处理 B
RabbitMQ 将发出 C
处理器 1 完成 A 并处于空闲状态,因此处理器 1 处理 C
我想出的Spring云数据流图是:
处理器A在上面,处理器B在下面
当我部署它并 运行 它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 接收 A、B,然后是 C
我很困惑,如果我想要的行为是我可以在 Spring 云数据流中实现的,或者是否有针对此的 RabbitMQ 设置,如消息删除的答案所暗示的那样
"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."
如果是这样,我可以在我的 Spring 云数据流源中设置它吗?或者它是 RabbitMQ 设置还是完全不同的东西
更新:
我已添加
spring.cloud.stream.bindings.input.group=consumerGroup
到我处理器的 application.properties 文件。
不幸的是,两个处理器都接收到完全相同的数据。
我是否需要在我的来源的 application.properties 中添加类似的条目?
我需要更改处理器上的注释吗?目前是:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
我是否需要以任何方式修改源上的注释?目前是:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
包含@Poller 是否以任何方式改变了这一点?
更新:
属性 的名字是 spring.cloud.stream.instanceCount 吗?
对于流式应用,您需要设置 ...consumer.group 属性 以便它们都在同一组中并竞争消息。
但这应该在 SCDF 中自动发生。