Spring Cloud Stream - Solace PubSub+ - 消费者并发

Spring Cloud Stream - Solace PubSub+ - Consumer Concurrency

我正在结合使用 Spring Cloud Stream 3.0.6(Cloud:Hoxton.SR6,Boot 2.3。0.RELEASE)与 Solace PubSub+。 我无法让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。

这是我的 StreamListener 代码:

    @StreamListener(JobTriggerEventConsumerBinding.INPUT)
    protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message, 
                                     JobExecutionTriggerEvent event, 
                                     MessageHeaders headers) throws InterruptedException {
        
        log.info("Processing on thread: " + Thread.currentThread().getId());
      
        Thread.sleep(5000);
        
        log.info("Received the event!");
        log.info("-- Raw message:    {}", message);
        log.info("-- Headers:        {}", headers);
        log.info("-- Event:          {}", event);
        log.info("-- Event Contents: {}", event.getMessage());
    }

如果我向输入通道发送 3 条消息(使用我编写的生产者应用程序),我会看到消息在同一个线程(具有相同的 ID)上按顺序处理。我想实现的是消息由3个线程同时处理。

我的application.yml看起来如下:

spring:
  cloud:
    stream:
      default:
        group: defaultConsumers
        consumer:
          concurrency: 3
      bindings:
        jobTriggers:
          group: jobTriggerConsumers 
          consumer:
            concurrency: 3
            max-attempts: 1
      solace: 
        bindings:
          jobTriggers:
            consumer:
              requeue-rejected: true

我的 pom.xml 包含以下依赖项:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-solace</artifactId>
      <version>2.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-cloud-connectors</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

难道这是 Solace PubSub+ 活页夹的问题?我读过 spring.cloud.stream.binders.<name>.consumer.concurrency 的行为可能取决于活页夹的实现。

这可能是什么问题?

参考文献:

# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'

services:
  primary:
    container_name: pubSubStandardSingleNode
    image: solace/solace-pubsub-standard:latest
    shm_size: 1g
    ulimits:
      core: 1
      nofile:
        soft: 2448
        hard: 38048
    ports:
    #Port Mappings:  Ports are mapped straight through from host to
    #container.  This may result in port collisions on commonly used
    #ports that will cause failure of the container to start.
      #Web transport
      - '80:80'
      #Web transport over TLS
      - '443:443'
      #SEMP over TLS
      - '943:943'
      #MQTT Default VPN
      #- '1883:1883'
      #AMQP Default VPN over TLS
      - '5671:5671'
      #AMQP Default VPN
      - '5672:5672'
      #MQTT Default VPN over WebSockets
      #- '8000:8000'
      #MQTT Default VPN over WebSockets / TLS
      #- '8443:8443'
      #MQTT Default VPN over TLS
      #- '8883:8883'
      #SEMP / PubSub+ Manager
      - '8080:8080'
      #REST Default VPN
      #- '9000:9000'
      #REST Default VPN over TLS
      #- '9443:9443'
      #SMF
      - '55555:55555'
      #SMF Compressed
      #- '55003:55003'
      #SMF over TLS
      - '55443:55443'
    environment:
      - username_admin_globalaccesslevel=admin
      - username_admin_password=admin
      - system_scaling_maxconnectioncount=100

好的,我会自己回答这个问题。

使用 RabbitMQ binder 和 运行 rabbit 实例尝试上述配置,并发工作正常。所以我认为一定是 Solace 活页夹出了问题。

经过一番谷歌搜索后,我确实找到了确认:https://github.com/SolaceProducts/solace-spring-cloud/issues/7

显然,Solace PubSub+ binder 目前不支持并发,这真是令人失望。至少看起来问题正在解决。

这里还有一些社区讨论: https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest

更新

此问题似乎已在 Spring Cloud Stream Solace 活页夹的 2.1.1 版本中得到修复。 IE。使用此依赖项

<dependency>
 <groupId>com.solace.spring.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-solace</artifactId>
 <version>2.1.1</version
</dependency>

如果您使用的是 Spring Cloud Solace BOM,则至少需要转到版本 1.1.1

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>solace-spring-cloud-bom</artifactId>
      <version>1.1.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
 </dependencies>
</dependencyManagement>

这适用于 Spring 云 Hoxton.SR6