一个 SCDF 来源,2 个处理者,但每个项目只有 1 个处理者

one SCDF source, 2 processors but only 1 processes each item

我的用例是这个的变体:

在示例中,1 个源向 rabbitmq 发出一个项目,两个 处理器都得到它。

我要相反的。我希望源向 rabbitmq 发送项目,但只有 1 个处理器处理每个项目。

假设我有:

1 个名为 source 的来源 2 个处理器,名为 processor1 和 processor2

所以源发出:A、B、C 到 rabbitmq

RabbitMQ 将发出 A

首先获得 A 的处理器将处理它 - 假设处理器 1 是幸运的并处理 A。

然后 RabbitMQ 将发出 B

由于处理器 1 忙于 A 而处理器 2 空闲处理器 2 处理 B

RabbitMQ 将发出 C

处理器 1 完成 A 并处于空闲状态,因此处理器 1 处理 C

我想出的Spring云数据流图是:

处理器A在上面,处理器B在下面

当我部署它并 运行 它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 接收 A、B,然后是 C

我很困惑,如果我想要的行为是我可以在 Spring 云数据流中实现的,或者是否有针对此的 RabbitMQ 设置,如消息删除的答案所暗示的那样

"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."

如果是这样,我可以在我的 Spring 云数据流源中设置它吗?或者它是 RabbitMQ 设置还是完全不同的东西

更新:

我已添加

spring.cloud.stream.bindings.input.group=consumerGroup

到我处理器的 application.properties 文件。

不幸的是,两个处理器都接收到完全相同的数据。

我是否需要在我的来源的 application.properties 中添加类似的条目?

我需要更改处理器上的注释吗?目前是:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

我是否需要以任何方式修改源上的注释?目前是:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = 
     @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

包含@Poller 是否以任何方式改变了这一点?

更新:

属性 的名字是 spring.cloud.stream.instanceCount 吗?

对于流式应用,您需要设置 ...consumer.group 属性 以便它们都在同一组中并竞争消息。

但这应该在 SCDF 中自动发生。