当 RabbitMQ 交换不存在时如何处理错误(消息通过消息网关接口发送)
How to handle errors when RabbitMQ exchange doesn't exist (and messages are sent through a messaging gateway interface)
我想知道在以下情况下处理错误的规范方法是什么(代码是一个最小的工作示例):
- 消息通过定义其
defaultRequestChannel
和 @Gateway
方法的消息传递网关发送:
@MessagingGateway(name = MY_GATEWAY, defaultRequestChannel = INPUT_CHANNEL)
public interface MyGateway
{
@Gateway
public void sendMessage(String message);
- 消息从通道中读取并通过 AMQP 出站适配器发送:
@Bean
public IntegrationFlow apiMutuaInputFlow()
{
return IntegrationFlows
.from(INPUT_CHANNEL)
.handle(Amqp.outboundAdapter(rabbitConfig.myTemplate()))
.get();
}
- RabbitMQ 配置是骨架的:
@Configuration
public class RabbitMqConfiguration
{
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
public RabbitTemplate myTemplate()
{
RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
r.setExchange(INPUT_QUEUE_NAME);
r.setConnectionFactory(rabbitConnectionFactory);
return r;
}
}
我通常包含一个 bean 来定义我所依赖的 RabbitMQ 配置(交换、队列和绑定),它实际上工作正常。但是在测试失败场景时,我发现了一种我不知道如何使用 Spring 集成正确处理的情况。步骤是:
- 删除配置 RabbitMQ 的 beans
- 运行 针对未配置的普通 RabbitMQ 实例的流程。
我期望的是:
- 无法传递消息,因为找不到交换。
- 要么我找到一些方法从调用者线程上的消息传递网关获取异常。
- 要么我找到一些方法来拦截这个错误。
我发现了什么:
- 无法传递消息,因为找不到交换,实际上每次调用
@Gateway
方法时都会记录此错误消息。
2020-02-11 08:18:40.746 ERROR 42778 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my.exchange' in vhost '/', class-id=60, method-id=40)
- 网关没有失败,我也没有找到配置它的方法(例如:向接口方法添加
throws
子句,配置事务通道,设置 wait-for-confirm
和confirm-timeout
).
- 我还没有找到一种方法来捕获
CachingConectionFactory
错误(例如:配置交易渠道)。
- 我还没有找到在另一个通道(在网关的
errorChannel
上指定)或 Spring 集成的默认 errorChannel
.[=56 中捕获错误消息的方法=]
我知道消息传递网关可能不会向上游传播此类故障,消息传递网关的工作是将调用者与消息传递隔离开来 API,但我绝对希望这样的错误能够被拦截。
你能给我指出正确的方向吗?
谢谢。
RabbitMQ 本质上是异步的,这是它表现如此出色的原因之一。
但是,您可以通过启用确认和 returns 并设置此选项来阻止呼叫者:
/**
* Set to true if you want to block the calling thread until a publisher confirm has
* been received. Requires a template configured for returns. If a confirm is not
* received within the confirm timeout or a negative acknowledgment or returned
* message is received, an exception will be thrown. Does not apply to the gateway
* since it blocks awaiting the reply.
* @param waitForConfirm true to block until the confirmation or timeout is received.
* @since 5.2
* @see #setConfirmTimeout(long)
* @see #setMultiSend(boolean)
*/
public void setWaitForConfirm(boolean waitForConfirm) {
this.waitForConfirm = waitForConfirm;
}
(使用 DSL .waitForConfirm(true)
)。
这还需要确认相关表达式。这是其中一个测试用例的示例
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return f -> f.handle(Amqp.outboundAdapter(template)
.exchangeName("")
.routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
.confirmCorrelationFunction(msg -> msg)
.waitForConfirm(true));
}
@Bean
public CachingConnectionFactory cf() {
CachingConnectionFactory ccf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
ccf.setPublisherReturns(true);
return ccf;
}
@Bean
public RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
rabbitTemplate.setMandatory(true); // for returns
rabbitTemplate.setReceiveTimeout(10_000);
return rabbitTemplate;
}
请记住,这会大大降低速度(类似于使用事务),因此您可能需要重新考虑是否要在每次发送时都这样做(除非性能不是问题)。
我想知道在以下情况下处理错误的规范方法是什么(代码是一个最小的工作示例):
- 消息通过定义其
defaultRequestChannel
和@Gateway
方法的消息传递网关发送:
@MessagingGateway(name = MY_GATEWAY, defaultRequestChannel = INPUT_CHANNEL)
public interface MyGateway
{
@Gateway
public void sendMessage(String message);
- 消息从通道中读取并通过 AMQP 出站适配器发送:
@Bean
public IntegrationFlow apiMutuaInputFlow()
{
return IntegrationFlows
.from(INPUT_CHANNEL)
.handle(Amqp.outboundAdapter(rabbitConfig.myTemplate()))
.get();
}
- RabbitMQ 配置是骨架的:
@Configuration
public class RabbitMqConfiguration
{
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
public RabbitTemplate myTemplate()
{
RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
r.setExchange(INPUT_QUEUE_NAME);
r.setConnectionFactory(rabbitConnectionFactory);
return r;
}
}
我通常包含一个 bean 来定义我所依赖的 RabbitMQ 配置(交换、队列和绑定),它实际上工作正常。但是在测试失败场景时,我发现了一种我不知道如何使用 Spring 集成正确处理的情况。步骤是:
- 删除配置 RabbitMQ 的 beans
- 运行 针对未配置的普通 RabbitMQ 实例的流程。
我期望的是:
- 无法传递消息,因为找不到交换。
- 要么我找到一些方法从调用者线程上的消息传递网关获取异常。
- 要么我找到一些方法来拦截这个错误。
我发现了什么:
- 无法传递消息,因为找不到交换,实际上每次调用
@Gateway
方法时都会记录此错误消息。
2020-02-11 08:18:40.746 ERROR 42778 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my.exchange' in vhost '/', class-id=60, method-id=40)
- 网关没有失败,我也没有找到配置它的方法(例如:向接口方法添加
throws
子句,配置事务通道,设置wait-for-confirm
和confirm-timeout
). - 我还没有找到一种方法来捕获
CachingConectionFactory
错误(例如:配置交易渠道)。 - 我还没有找到在另一个通道(在网关的
errorChannel
上指定)或 Spring 集成的默认errorChannel
.[=56 中捕获错误消息的方法=]
我知道消息传递网关可能不会向上游传播此类故障,消息传递网关的工作是将调用者与消息传递隔离开来 API,但我绝对希望这样的错误能够被拦截。
你能给我指出正确的方向吗?
谢谢。
RabbitMQ 本质上是异步的,这是它表现如此出色的原因之一。
但是,您可以通过启用确认和 returns 并设置此选项来阻止呼叫者:
/**
* Set to true if you want to block the calling thread until a publisher confirm has
* been received. Requires a template configured for returns. If a confirm is not
* received within the confirm timeout or a negative acknowledgment or returned
* message is received, an exception will be thrown. Does not apply to the gateway
* since it blocks awaiting the reply.
* @param waitForConfirm true to block until the confirmation or timeout is received.
* @since 5.2
* @see #setConfirmTimeout(long)
* @see #setMultiSend(boolean)
*/
public void setWaitForConfirm(boolean waitForConfirm) {
this.waitForConfirm = waitForConfirm;
}
(使用 DSL .waitForConfirm(true)
)。
这还需要确认相关表达式。这是其中一个测试用例的示例
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return f -> f.handle(Amqp.outboundAdapter(template)
.exchangeName("")
.routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
.confirmCorrelationFunction(msg -> msg)
.waitForConfirm(true));
}
@Bean
public CachingConnectionFactory cf() {
CachingConnectionFactory ccf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
ccf.setPublisherReturns(true);
return ccf;
}
@Bean
public RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
rabbitTemplate.setMandatory(true); // for returns
rabbitTemplate.setReceiveTimeout(10_000);
return rabbitTemplate;
}
请记住,这会大大降低速度(类似于使用事务),因此您可能需要重新考虑是否要在每次发送时都这样做(除非性能不是问题)。