在 RabbitMQ 中配置消费者取消
Configuring Consumer Cancellation in RabbitMQ
我们正在使用带有镜像队列的 2 节点主动-主动 RabbitMQ 集群。镜像策略为:
"policies":[{"vhost":"/","name":"ha-all","pattern":"","apply->to":"all","definition":{"ha-mode":"all","ha-sync-mode":"automatic"},"priority":0}]
版本:RabbitMQ 3.5.4,Erlang 17.4,spring-amqp/spring-rabbit:1.4.5.RELEASE
现在,我们正在尝试实现消费者取消,如Highly Available Queues所述。
但是,由于我们没有使用过channel,所以我们不能使用上面给出的{{basicConsumer}}方法link。
如何在配置中将“x-cancel-on-ha-failover”设置为真?
豆子 xml 是这样的:
<rabbit:connection-factory id="connectionFactory"
addresses="localhost:5672"
username="guest"
password="guest"
channel-cache-size="5" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- Spring AMQP Template -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" message-converter="jsonMessageConverter" />
<!-- in case connection is broken then Retry based on the below policy -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="2" />
<property name="maxInterval" value="30000" />
</bean>
</property>
</bean>
<rabbit:queue name="testQueue" durable="true">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<bean id="messsageConsumer" class="consumer.RabbitConsumer">
</bean>
<rabbit:listener-container
connection-factory="connectionFactory" concurrency="5" max-concurrency="5" message-converter="jsonMessageConverter">
<rabbit:listener queues="testQueue" ref="messsageConsumer" />
</rabbit:listener-container>
<rabbit:listener-container>
实际上在后台填充了一个 SimpleMessageListenerContainer
bean。最后一个支持public void setConsumerArguments(Map<String, Object> args)
。
因此,要满足您的要求,您只需为 messsageConsumer
.
构建原始 SimpleMessageListenerContainer
<bean>
在您为您的应用程序修复该问题的同时,我会要求您提供有关添加 <consumer-arguments>
组件的 JIRA。我们也许可以在当前的 GA 截止日期前解决它。
我们正在使用带有镜像队列的 2 节点主动-主动 RabbitMQ 集群。镜像策略为:
"policies":[{"vhost":"/","name":"ha-all","pattern":"","apply->to":"all","definition":{"ha-mode":"all","ha-sync-mode":"automatic"},"priority":0}]
版本:RabbitMQ 3.5.4,Erlang 17.4,spring-amqp/spring-rabbit:1.4.5.RELEASE
现在,我们正在尝试实现消费者取消,如Highly Available Queues所述。
但是,由于我们没有使用过channel,所以我们不能使用上面给出的{{basicConsumer}}方法link。
如何在配置中将“x-cancel-on-ha-failover”设置为真?
豆子 xml 是这样的:
<rabbit:connection-factory id="connectionFactory"
addresses="localhost:5672"
username="guest"
password="guest"
channel-cache-size="5" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- Spring AMQP Template -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" message-converter="jsonMessageConverter" />
<!-- in case connection is broken then Retry based on the below policy -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="2" />
<property name="maxInterval" value="30000" />
</bean>
</property>
</bean>
<rabbit:queue name="testQueue" durable="true">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<bean id="messsageConsumer" class="consumer.RabbitConsumer">
</bean>
<rabbit:listener-container
connection-factory="connectionFactory" concurrency="5" max-concurrency="5" message-converter="jsonMessageConverter">
<rabbit:listener queues="testQueue" ref="messsageConsumer" />
</rabbit:listener-container>
<rabbit:listener-container>
实际上在后台填充了一个 SimpleMessageListenerContainer
bean。最后一个支持public void setConsumerArguments(Map<String, Object> args)
。
因此,要满足您的要求,您只需为 messsageConsumer
.
SimpleMessageListenerContainer
<bean>
在您为您的应用程序修复该问题的同时,我会要求您提供有关添加 <consumer-arguments>
组件的 JIRA。我们也许可以在当前的 GA 截止日期前解决它。