使用 spring-cloud-stream 在 rabbitmq 消息消费者上错误地调用了流侦听器
incorrectly invoked stream listeners on rabbitmq message consumer with spring-cloud-stream
我有一个项目是 spring-cloud-starter-parent:Hoxton.SR9
。
这里的主要 objective 是消息传递,因此有消息 (kafka/rabbit) 生产者和消费者声明:
spring:
cloud:
stream:
default-binder: kafka
contentType: application/*+avro
bindings:
loanContractWaitingForActivationInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanContractComponentsChangedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanOfferPresentedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
postingEntryRequestInput:
destination: PAY_PaymentServices_PostingEntryRequest
group: LH_LoanEventKeeper
contentType: application/*+avro
consumer:
maxAttempts: 1
autoCommitOnError: true
headerMode: embeddedHeaders
kafka:
bindings:
postingEntryRequestInput:
consumer:
autoCommitOnError: true
binder:
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:SSL}
ssl:
truststore:
location: path
password: ****
type: JKS
rabbit:
binder:
useSSL: ${AMQP_SSL_ENABLED:true}
bindings:
loanContractWaitingForActivationInput:
consumer:
bindingRoutingKey: loans.contracts.waitingForActivation
exchange-auto-delete: false
loanContractComponentsChangedInput:
consumer:
bindingRoutingKey: loans.contracts.components
exchange-auto-delete: false
loanOfferPresentedInput:
consumer:
bindingRoutingKey: loans.offers.presented
exchange-auto-delete: false
连同输入的 @StreamListener
s 一起提供:
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_WAITING_FOR_ACTIVATION_INPUT)
void receiveLoanContractWaitingForActivation(@Payload LoanContractWaitingForActivationRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_COMPONENTS_CHANGED_INPUT)
void receiveLoanContractComponentsChanged(@Payload LoanContractComponentsChangedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_OFFER_PRESENTED_INPUT)
void receiveLoanOfferPresented(@Payload LoanOfferPresentedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsKafka.POSTING_ENTRY_REQUEST_INPUT)
void receivePostingEntryRequest(@Payload PostingEntryRequest message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
配置了 rabbitmq 交换:
话虽如此,问题是使用此配置程序会在所有 4 个 streamListener 消费者端点上接收具有特定路由键的消息。反过来。所以首先
receiveLoanContractWaitingForActivation(..)
被调用时消息类型与第一个参数声明的负载不匹配,之后 receiveLoanContractComponentsChanged(..)
与第一个参数相同,然后是正确的(根据消息的路由键:receiveLoanOfferPresented(..)
),消息得到正确处理,因为调用了正确的侦听器,最后一个不正确的 receivePostingEntryRequest(..)
最终与第一个和第二个相同。
所以基本上我似乎无法将 @StreamListener
绑定到我认为将由路由键指定的交换中的正确队列。
能否请您指出missing/incorrect这里的配置?
谢谢!
您不能对所有 3 个绑定使用相同的 destination/group;每个都需要一个单独的队列。
路由键只是队列和交换器之间的一个绑定;使用您当前的设置,您有一个包含三个绑定的队列。
更改组,使其独一无二。
我有一个项目是 spring-cloud-starter-parent:Hoxton.SR9
。
这里的主要 objective 是消息传递,因此有消息 (kafka/rabbit) 生产者和消费者声明:
spring:
cloud:
stream:
default-binder: kafka
contentType: application/*+avro
bindings:
loanContractWaitingForActivationInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanContractComponentsChangedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanOfferPresentedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
postingEntryRequestInput:
destination: PAY_PaymentServices_PostingEntryRequest
group: LH_LoanEventKeeper
contentType: application/*+avro
consumer:
maxAttempts: 1
autoCommitOnError: true
headerMode: embeddedHeaders
kafka:
bindings:
postingEntryRequestInput:
consumer:
autoCommitOnError: true
binder:
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:SSL}
ssl:
truststore:
location: path
password: ****
type: JKS
rabbit:
binder:
useSSL: ${AMQP_SSL_ENABLED:true}
bindings:
loanContractWaitingForActivationInput:
consumer:
bindingRoutingKey: loans.contracts.waitingForActivation
exchange-auto-delete: false
loanContractComponentsChangedInput:
consumer:
bindingRoutingKey: loans.contracts.components
exchange-auto-delete: false
loanOfferPresentedInput:
consumer:
bindingRoutingKey: loans.offers.presented
exchange-auto-delete: false
连同输入的 @StreamListener
s 一起提供:
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_WAITING_FOR_ACTIVATION_INPUT)
void receiveLoanContractWaitingForActivation(@Payload LoanContractWaitingForActivationRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_COMPONENTS_CHANGED_INPUT)
void receiveLoanContractComponentsChanged(@Payload LoanContractComponentsChangedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_OFFER_PRESENTED_INPUT)
void receiveLoanOfferPresented(@Payload LoanOfferPresentedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsKafka.POSTING_ENTRY_REQUEST_INPUT)
void receivePostingEntryRequest(@Payload PostingEntryRequest message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
配置了 rabbitmq 交换:
话虽如此,问题是使用此配置程序会在所有 4 个 streamListener 消费者端点上接收具有特定路由键的消息。反过来。所以首先
receiveLoanContractWaitingForActivation(..)
被调用时消息类型与第一个参数声明的负载不匹配,之后 receiveLoanContractComponentsChanged(..)
与第一个参数相同,然后是正确的(根据消息的路由键:receiveLoanOfferPresented(..)
),消息得到正确处理,因为调用了正确的侦听器,最后一个不正确的 receivePostingEntryRequest(..)
最终与第一个和第二个相同。
所以基本上我似乎无法将 @StreamListener
绑定到我认为将由路由键指定的交换中的正确队列。
能否请您指出missing/incorrect这里的配置?
谢谢!
您不能对所有 3 个绑定使用相同的 destination/group;每个都需要一个单独的队列。
路由键只是队列和交换器之间的一个绑定;使用您当前的设置,您有一个包含三个绑定的队列。
更改组,使其独一无二。