confirm-ack 和 confirm-nack 通道没有被调用
confirm-ack and confirm-nack channel not getting invoked
我正在尝试配置 ack 和 nack 通道,但收到如下错误
"Only one confirm call back channel can be configured at a time"
Below are the things which I tried:
1. confirm-correlation-expression="#root" //no result found
2. Changed the amqp template like below
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" publisher-returns="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
<rabbit:admin connection-factory="connectionFactory" />
/>
The error was not there but the ack channel is not getting invoked.
Can anyone help me on this?
这里是 MQ 配置
<rabbit:template id="nonTransactionalRabbitTemplate"
connection-factory="nonTransactionalConnectionFactory"
mandatory="true"
channel-transacted="false"
confirm-callback="confirmCallback"
return-call`enter code here`back="returnCallback" />
<rabbit:connection-factory id="nonTransactionalConnectionFactory"
connection-factory="rabbitClientConnectionFactory"
publisher-confirms="true"
publisher-returns="true"/>
<rabbit:connection-factory id="nonTransactionalConnectionFactory"
connection-factory="rabbitClientConnectionFactory"
publisher-confirms="true"
publisher-returns="true"/>
<bean id="rabbitClientConnectionFactory" class="com.rabbitmq.client.ConnectionFactory" >
<property name="uri" value="${mq.uri}" />
<property name="requestedHeartbeat" value="30" />
</bean>
这是我的出站适配器
<int-amqp:outbound-channel-adapter channel="abc"
routing-key="xyz"
amqp-template="amqpTemplate"
confirm-correlation-expression="payload"
confirm-ack-channel="successRespTransformChannel"
confirm-nack-channel="failureRespTransformChannel"
return-channel="failureRespTransformChannel"
mapped-request-headers="*"
这是我的服务激活器
<chain input-channel="successRespTransformChannel">
<int:header-enricher>
<error-channel ref="failed-publishing" />
</int:header-enricher>
<service-activator id="successResp" expression="@abc.addRequestTracking(payload.id,'success')"/>
</chain>
在适配器中使用模板时,您必须不能设置自己的回调
confirm-callback="confirmCallback"
return-callback="returnCallback" />
适配器将自己设置为确认和 returns 的回调。这会失败,因为您已经设置了回调。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
mandatory
启用 returns,而不是 nacks。
编辑
我不知道你做错了什么。我刚刚写了一个快速测试用例...
@SpringBootApplication
@ImportResource("context.xml")
public class So36546646Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(So36546646Application.class, args);
ctx.getBean("out", MessageChannel.class).send(new GenericMessage<>("foo"));
boolean received = ctx.getBean(MyService.class).latch.await(10, TimeUnit.SECONDS);
if (!received) {
System.err.println("Did not receive ack");
}
ctx.getBean(RabbitAdmin.class).deleteQueue(ctx.getBean(Queue.class).getName());
ctx.close();
}
public static class MyService {
private final CountDownLatch latch = new CountDownLatch(1);
public void handle(Message<?> ack) {
System.out.println("ack:" + ack);
latch.countDown();
}
}
}
和
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<rabbit:connection-factory id="cf" publisher-confirms="true" publisher-returns="true" host="localhost" />
<rabbit:template id="t" connection-factory="cf" mandatory="true" />
<rabbit:admin connection-factory="cf" />
<rabbit:queue id="anon" />
<int:channel id="out" />
<int-amqp:outbound-channel-adapter
channel="out"
amqp-template="t"
routing-key="#{anon.name}"
confirm-correlation-expression="payload"
confirm-ack-channel="acks"
confirm-nack-channel="acks"
return-channel="returns" />
<int:service-activator input-channel="acks" ref="service" />
<bean id="service" class="com.example.So36546646Application$MyService" />
<int:channel id="returns">
<int:queue />
</int:channel>
</beans>
而且效果很好:
ack:GenericMessage [payload=foo, headers={amqp_publishConfirm=true, id=5eed89bf-11b6-76a5-34ed-0091c6bac2c8, timestamp=1460464254229}]
我正在尝试配置 ack 和 nack 通道,但收到如下错误 "Only one confirm call back channel can be configured at a time"
Below are the things which I tried:
1. confirm-correlation-expression="#root" //no result found
2. Changed the amqp template like below
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" publisher-returns="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
<rabbit:admin connection-factory="connectionFactory" />
/>
The error was not there but the ack channel is not getting invoked.
Can anyone help me on this?
这里是 MQ 配置
<rabbit:template id="nonTransactionalRabbitTemplate"
connection-factory="nonTransactionalConnectionFactory"
mandatory="true"
channel-transacted="false"
confirm-callback="confirmCallback"
return-call`enter code here`back="returnCallback" />
<rabbit:connection-factory id="nonTransactionalConnectionFactory"
connection-factory="rabbitClientConnectionFactory"
publisher-confirms="true"
publisher-returns="true"/>
<rabbit:connection-factory id="nonTransactionalConnectionFactory"
connection-factory="rabbitClientConnectionFactory"
publisher-confirms="true"
publisher-returns="true"/>
<bean id="rabbitClientConnectionFactory" class="com.rabbitmq.client.ConnectionFactory" >
<property name="uri" value="${mq.uri}" />
<property name="requestedHeartbeat" value="30" />
</bean>
这是我的出站适配器
<int-amqp:outbound-channel-adapter channel="abc"
routing-key="xyz"
amqp-template="amqpTemplate"
confirm-correlation-expression="payload"
confirm-ack-channel="successRespTransformChannel"
confirm-nack-channel="failureRespTransformChannel"
return-channel="failureRespTransformChannel"
mapped-request-headers="*"
这是我的服务激活器
<chain input-channel="successRespTransformChannel">
<int:header-enricher>
<error-channel ref="failed-publishing" />
</int:header-enricher>
<service-activator id="successResp" expression="@abc.addRequestTracking(payload.id,'success')"/>
</chain>
在适配器中使用模板时,您必须不能设置自己的回调
confirm-callback="confirmCallback"
return-callback="returnCallback" />
适配器将自己设置为确认和 returns 的回调。这会失败,因为您已经设置了回调。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
mandatory
启用 returns,而不是 nacks。
编辑
我不知道你做错了什么。我刚刚写了一个快速测试用例...
@SpringBootApplication
@ImportResource("context.xml")
public class So36546646Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(So36546646Application.class, args);
ctx.getBean("out", MessageChannel.class).send(new GenericMessage<>("foo"));
boolean received = ctx.getBean(MyService.class).latch.await(10, TimeUnit.SECONDS);
if (!received) {
System.err.println("Did not receive ack");
}
ctx.getBean(RabbitAdmin.class).deleteQueue(ctx.getBean(Queue.class).getName());
ctx.close();
}
public static class MyService {
private final CountDownLatch latch = new CountDownLatch(1);
public void handle(Message<?> ack) {
System.out.println("ack:" + ack);
latch.countDown();
}
}
}
和
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<rabbit:connection-factory id="cf" publisher-confirms="true" publisher-returns="true" host="localhost" />
<rabbit:template id="t" connection-factory="cf" mandatory="true" />
<rabbit:admin connection-factory="cf" />
<rabbit:queue id="anon" />
<int:channel id="out" />
<int-amqp:outbound-channel-adapter
channel="out"
amqp-template="t"
routing-key="#{anon.name}"
confirm-correlation-expression="payload"
confirm-ack-channel="acks"
confirm-nack-channel="acks"
return-channel="returns" />
<int:service-activator input-channel="acks" ref="service" />
<bean id="service" class="com.example.So36546646Application$MyService" />
<int:channel id="returns">
<int:queue />
</int:channel>
</beans>
而且效果很好:
ack:GenericMessage [payload=foo, headers={amqp_publishConfirm=true, id=5eed89bf-11b6-76a5-34ed-0091c6bac2c8, timestamp=1460464254229}]