使用 spring-cloud-stream 在 rabbitmq 消息消费者上错误地调用了流侦听器

incorrectly invoked stream listeners on rabbitmq message consumer with spring-cloud-stream

我有一个项目是 spring-cloud-starter-parent:Hoxton.SR9。 这里的主要 objective 是消息传递,因此有消息 (kafka/rabbit) 生产者和消费者声明:

spring:
  cloud:
    stream:
      default-binder: kafka
      contentType: application/*+avro
      bindings:
        loanContractWaitingForActivationInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        loanContractComponentsChangedInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        loanOfferPresentedInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        postingEntryRequestInput:
          destination: PAY_PaymentServices_PostingEntryRequest
          group: LH_LoanEventKeeper
          contentType: application/*+avro
          consumer:
            maxAttempts: 1
            autoCommitOnError: true
            headerMode: embeddedHeaders

      kafka:
        bindings:
          postingEntryRequestInput:
            consumer:
              autoCommitOnError: true
        binder:
          configuration:
            security:
              protocol: ${KAFKA_SECURITY_PROTOCOL:SSL}
            ssl:
              truststore:
                location: path
                password: ****
                type: JKS

      rabbit:
        binder:
          useSSL: ${AMQP_SSL_ENABLED:true}
        bindings:
          loanContractWaitingForActivationInput:
            consumer:
              bindingRoutingKey: loans.contracts.waitingForActivation
              exchange-auto-delete: false
          loanContractComponentsChangedInput:
            consumer:
              bindingRoutingKey: loans.contracts.components
              exchange-auto-delete: false
          loanOfferPresentedInput:
            consumer:
              bindingRoutingKey: loans.offers.presented
              exchange-auto-delete: false

连同输入的 @StreamListeners 一起提供:

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_WAITING_FOR_ACTIVATION_INPUT)
    void receiveLoanContractWaitingForActivation(@Payload LoanContractWaitingForActivationRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_COMPONENTS_CHANGED_INPUT)
    void receiveLoanContractComponentsChanged(@Payload LoanContractComponentsChangedRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_OFFER_PRESENTED_INPUT)
    void receiveLoanOfferPresented(@Payload LoanOfferPresentedRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsKafka.POSTING_ENTRY_REQUEST_INPUT)
    void receivePostingEntryRequest(@Payload PostingEntryRequest message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

配置了 rabbitmq 交换:

话虽如此,问题是使用此配置程序会在所有 4 个 streamListener 消费者端点上接收具有特定路由键的消息。反过来。所以首先 receiveLoanContractWaitingForActivation(..) 被调用时消息类型与第一个参数声明的负载不匹配,之后 receiveLoanContractComponentsChanged(..) 与第一个参数相同,然后是正确的(根据消息的路由键:receiveLoanOfferPresented(..)),消息得到正确处理,因为调用了正确的侦听器,最后一个不正确的 receivePostingEntryRequest(..) 最终与第一个和第二个相同。

所以基本上我似乎无法将 @StreamListener 绑定到我认为将由路由键指定的交换中的正确队列。

能否请您指出missing/incorrect这里的配置?

谢谢!

您不能对所有 3 个绑定使用相同的 destination/group;每个都需要一个单独的队列。

路由键只是队列和交换器之间的一个绑定;使用您当前的设置,您有一个包含三个绑定的队列。

更改组,使其独一无二。