Spring Cloud Stream - Solace PubSub+ - 消费者并发
Spring Cloud Stream - Solace PubSub+ - Consumer Concurrency
我正在结合使用 Spring Cloud Stream 3.0.6(Cloud:Hoxton.SR6,Boot 2.3。0.RELEASE)与 Solace PubSub+。
我无法让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。
这是我的 StreamListener
代码:
@StreamListener(JobTriggerEventConsumerBinding.INPUT)
protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message,
JobExecutionTriggerEvent event,
MessageHeaders headers) throws InterruptedException {
log.info("Processing on thread: " + Thread.currentThread().getId());
Thread.sleep(5000);
log.info("Received the event!");
log.info("-- Raw message: {}", message);
log.info("-- Headers: {}", headers);
log.info("-- Event: {}", event);
log.info("-- Event Contents: {}", event.getMessage());
}
如果我向输入通道发送 3 条消息(使用我编写的生产者应用程序),我会看到消息在同一个线程(具有相同的 ID)上按顺序处理。我想实现的是消息由3个线程同时处理。
我的application.yml
看起来如下:
spring:
cloud:
stream:
default:
group: defaultConsumers
consumer:
concurrency: 3
bindings:
jobTriggers:
group: jobTriggerConsumers
consumer:
concurrency: 3
max-attempts: 1
solace:
bindings:
jobTriggers:
consumer:
requeue-rejected: true
我的 pom.xml
包含以下依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</exclusion>
</exclusions>
</dependency>
难道这是 Solace PubSub+ 活页夹的问题?我读过 spring.cloud.stream.binders.<name>.consumer.concurrency
的行为可能取决于活页夹的实现。
这可能是什么问题?
参考文献:
- Solace PubSub+ Binder
- Docker 为本地运行的 Solace PubSub+ 实例撰写文件:
# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'
services:
primary:
container_name: pubSubStandardSingleNode
image: solace/solace-pubsub-standard:latest
shm_size: 1g
ulimits:
core: 1
nofile:
soft: 2448
hard: 38048
ports:
#Port Mappings: Ports are mapped straight through from host to
#container. This may result in port collisions on commonly used
#ports that will cause failure of the container to start.
#Web transport
- '80:80'
#Web transport over TLS
- '443:443'
#SEMP over TLS
- '943:943'
#MQTT Default VPN
#- '1883:1883'
#AMQP Default VPN over TLS
- '5671:5671'
#AMQP Default VPN
- '5672:5672'
#MQTT Default VPN over WebSockets
#- '8000:8000'
#MQTT Default VPN over WebSockets / TLS
#- '8443:8443'
#MQTT Default VPN over TLS
#- '8883:8883'
#SEMP / PubSub+ Manager
- '8080:8080'
#REST Default VPN
#- '9000:9000'
#REST Default VPN over TLS
#- '9443:9443'
#SMF
- '55555:55555'
#SMF Compressed
#- '55003:55003'
#SMF over TLS
- '55443:55443'
environment:
- username_admin_globalaccesslevel=admin
- username_admin_password=admin
- system_scaling_maxconnectioncount=100
好的,我会自己回答这个问题。
使用 RabbitMQ binder 和 运行 rabbit 实例尝试上述配置,并发工作正常。所以我认为一定是 Solace 活页夹出了问题。
经过一番谷歌搜索后,我确实找到了确认:https://github.com/SolaceProducts/solace-spring-cloud/issues/7
显然,Solace PubSub+ binder 目前不支持并发,这真是令人失望。至少看起来问题正在解决。
这里还有一些社区讨论:
https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest
更新
此问题似乎已在 Spring Cloud Stream Solace 活页夹的 2.1.1
版本中得到修复。 IE。使用此依赖项
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.1.1</version
</dependency>
如果您使用的是 Spring Cloud Solace BOM,则至少需要转到版本 1.1.1
:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>1.1.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
这适用于 Spring 云 Hoxton.SR6
。
我正在结合使用 Spring Cloud Stream 3.0.6(Cloud:Hoxton.SR6,Boot 2.3。0.RELEASE)与 Solace PubSub+。 我无法让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。
这是我的 StreamListener
代码:
@StreamListener(JobTriggerEventConsumerBinding.INPUT)
protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message,
JobExecutionTriggerEvent event,
MessageHeaders headers) throws InterruptedException {
log.info("Processing on thread: " + Thread.currentThread().getId());
Thread.sleep(5000);
log.info("Received the event!");
log.info("-- Raw message: {}", message);
log.info("-- Headers: {}", headers);
log.info("-- Event: {}", event);
log.info("-- Event Contents: {}", event.getMessage());
}
如果我向输入通道发送 3 条消息(使用我编写的生产者应用程序),我会看到消息在同一个线程(具有相同的 ID)上按顺序处理。我想实现的是消息由3个线程同时处理。
我的application.yml
看起来如下:
spring:
cloud:
stream:
default:
group: defaultConsumers
consumer:
concurrency: 3
bindings:
jobTriggers:
group: jobTriggerConsumers
consumer:
concurrency: 3
max-attempts: 1
solace:
bindings:
jobTriggers:
consumer:
requeue-rejected: true
我的 pom.xml
包含以下依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</exclusion>
</exclusions>
</dependency>
难道这是 Solace PubSub+ 活页夹的问题?我读过 spring.cloud.stream.binders.<name>.consumer.concurrency
的行为可能取决于活页夹的实现。
这可能是什么问题?
参考文献:
- Solace PubSub+ Binder
- Docker 为本地运行的 Solace PubSub+ 实例撰写文件:
# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'
services:
primary:
container_name: pubSubStandardSingleNode
image: solace/solace-pubsub-standard:latest
shm_size: 1g
ulimits:
core: 1
nofile:
soft: 2448
hard: 38048
ports:
#Port Mappings: Ports are mapped straight through from host to
#container. This may result in port collisions on commonly used
#ports that will cause failure of the container to start.
#Web transport
- '80:80'
#Web transport over TLS
- '443:443'
#SEMP over TLS
- '943:943'
#MQTT Default VPN
#- '1883:1883'
#AMQP Default VPN over TLS
- '5671:5671'
#AMQP Default VPN
- '5672:5672'
#MQTT Default VPN over WebSockets
#- '8000:8000'
#MQTT Default VPN over WebSockets / TLS
#- '8443:8443'
#MQTT Default VPN over TLS
#- '8883:8883'
#SEMP / PubSub+ Manager
- '8080:8080'
#REST Default VPN
#- '9000:9000'
#REST Default VPN over TLS
#- '9443:9443'
#SMF
- '55555:55555'
#SMF Compressed
#- '55003:55003'
#SMF over TLS
- '55443:55443'
environment:
- username_admin_globalaccesslevel=admin
- username_admin_password=admin
- system_scaling_maxconnectioncount=100
好的,我会自己回答这个问题。
使用 RabbitMQ binder 和 运行 rabbit 实例尝试上述配置,并发工作正常。所以我认为一定是 Solace 活页夹出了问题。
经过一番谷歌搜索后,我确实找到了确认:https://github.com/SolaceProducts/solace-spring-cloud/issues/7
显然,Solace PubSub+ binder 目前不支持并发,这真是令人失望。至少看起来问题正在解决。
这里还有一些社区讨论: https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest
更新
此问题似乎已在 Spring Cloud Stream Solace 活页夹的 2.1.1
版本中得到修复。 IE。使用此依赖项
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.1.1</version
</dependency>
如果您使用的是 Spring Cloud Solace BOM,则至少需要转到版本 1.1.1
:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>1.1.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
这适用于 Spring 云 Hoxton.SR6
。