Spring Cloud Stream with Rabbit Binder - source/sink 队列名称不匹配
Spring Cloud Stream with Rabbit Binder - source/sink queue names don't match
最近开始玩Spring Cloud Stream和RabbitMQ binder
如果我理解正确,当两个服务要传递消息时,一个应该配置 source 来发送消息,另一个应该配置 sink用于接收消息 - 两者应使用相同的 channel.
我有 频道 命名为 testchannel
。不过,我注意到 source 创建了 RabbitMQ 绑定:
- 交换
testchannel
,
- 路由键
testchannel
,
- 队列
testchannel.default
(持久),
同时 sink 创建了 RabbitMQ 绑定:
- 交换
testchannel
,
- 路由键
#
,
- 队列
testchannel.anonymous.RANDOM_ID
(独占)。
为简洁起见,我跳过了前缀。
现在当我运行 两个应用程序。第一个发送消息到 testchannel
交换,然后路由到两个队列(我假设路由键是 testchannel
)。第二个应用程序使用来自 运行dom 队列的消息,但从未使用来自默认队列的消息。
我的另一个问题是 - 第二个应用程序仅使用 sink,但它也为输出通道创建绑定,默认情况下为 output
,因为我没有没有指定任何内容。
我使用相同的 Gradle 脚本构建这两个应用程序:
buildscript {
ext {
springBootVersion = '1.3.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
compile(
'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
)
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
}
}
第一个应用属性:
server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
第一应用源代码:
@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}
第二个应用属性:
server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
第二个应用程序源代码:
@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
所以我的问题是。
- source 和 sink 不应该使用相同的通道并因此使用相同的代理队列吗?实现该目标的正确配置是什么? (我的目标是拥有多个 sink 服务实例,但只有一个应该使用消息。)
- 当我只使用 sink 时,框架是否应该创建输出绑定?如果是,如何禁用它。
默认;每个消费者都有自己的队列;这是一个 publish/subscribe 场景。
有消费者的概念group
,因此您可以让多个实例竞争来自同一队列的消息。
绑定生产者时,会绑定一个默认队列。
如果您想订阅default
群组;你必须设置群组:
spring.cloud.stream.bindings.input.group=default
如果您不提供组,您将获得一个排他的自动删除队列。
编辑
由于默认队列是持久的,你也应该设置
spring.cloud.stream.bindings.input.durableSubscription=true
避免在消费者绑定时发出警告,并在消费者先绑定且队列尚不存在时确保队列持久。
最近开始玩Spring Cloud Stream和RabbitMQ binder
如果我理解正确,当两个服务要传递消息时,一个应该配置 source 来发送消息,另一个应该配置 sink用于接收消息 - 两者应使用相同的 channel.
我有 频道 命名为 testchannel
。不过,我注意到 source 创建了 RabbitMQ 绑定:
- 交换
testchannel
, - 路由键
testchannel
, - 队列
testchannel.default
(持久),
同时 sink 创建了 RabbitMQ 绑定:
- 交换
testchannel
, - 路由键
#
, - 队列
testchannel.anonymous.RANDOM_ID
(独占)。
为简洁起见,我跳过了前缀。
现在当我运行 两个应用程序。第一个发送消息到 testchannel
交换,然后路由到两个队列(我假设路由键是 testchannel
)。第二个应用程序使用来自 运行dom 队列的消息,但从未使用来自默认队列的消息。
我的另一个问题是 - 第二个应用程序仅使用 sink,但它也为输出通道创建绑定,默认情况下为 output
,因为我没有没有指定任何内容。
我使用相同的 Gradle 脚本构建这两个应用程序:
buildscript {
ext {
springBootVersion = '1.3.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
compile(
'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
)
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
}
}
第一个应用属性:
server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
第一应用源代码:
@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}
第二个应用属性:
server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
第二个应用程序源代码:
@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
所以我的问题是。
- source 和 sink 不应该使用相同的通道并因此使用相同的代理队列吗?实现该目标的正确配置是什么? (我的目标是拥有多个 sink 服务实例,但只有一个应该使用消息。)
- 当我只使用 sink 时,框架是否应该创建输出绑定?如果是,如何禁用它。
默认;每个消费者都有自己的队列;这是一个 publish/subscribe 场景。
有消费者的概念group
,因此您可以让多个实例竞争来自同一队列的消息。
绑定生产者时,会绑定一个默认队列。
如果您想订阅default
群组;你必须设置群组:
spring.cloud.stream.bindings.input.group=default
如果您不提供组,您将获得一个排他的自动删除队列。
编辑
由于默认队列是持久的,你也应该设置
spring.cloud.stream.bindings.input.durableSubscription=true
避免在消费者绑定时发出警告,并在消费者先绑定且队列尚不存在时确保队列持久。