在向其发布消息之前确保 AMQP 交换存在
Ensure that AMQP exchange exists before publishing a message to it
问题
一个 Java 应用程序使用 RabbitMQ 和客户端库 com.rabbitmq:amqp-client
连接到它。应用程序在初始化期间声明一个 AMQP 交换器,并定期向它发布消息。
如果该交换由于某种原因被删除,应用程序将无法向其发布消息,并且 AMQP 通道会被库自动关闭。因此,任何后续发布(即使在重新创建交换之后)都会失败,并出现以下异常:
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'logs' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)
如何保证交易所在发布前确实存在?我看到以下选项。
可能的解决方案
1。尝试在每次显式发布之前重新声明交换
由于 exchangeDeclare
是幂等的,如果交换已经到位则无效,我可以在任何发布之前明确声明交换:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message);
这段代码的问题在于它看起来很愚蠢,因为大多数时候交换已经就位,而声明只是多余的。
此外,如果交换恰好在声明和发布之间被删除,我仍然有麻烦。
2。通过 exchangeDeclarePassive
验证交换确实存在
我可以使用 exchangeDeclarePassive
在发布之前检查交换是否存在,但以下明显的方法不起作用:
private static void ensureExchangeExists(Channel channel) throws IOException {
try {
channel.exchangeDeclarePassive(EXCHANGE_NAME);
} catch (Exception e) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
}
}
问题是,如果缺少exchange,exchangeDeclarePassive
会抛出异常,通道会被库自动关闭。所以 catch 块中的代码不能声明交换(因为它试图在关闭的通道上执行操作)。
所以我不能再使用单个 AMQP 通道,必须以某种方式管理它们。
如果在声明和发布之间删除交换,我仍然有麻烦。
3。发布时捕获异常
你不能只用 try/catch 块包装调用 channel.basicPublish 因为如果缺少交换则不会抛出异常。 解释在这种情况下实际发生了什么。
但是您可以注册一个 ShutdownListener
,它能够检测通道何时关闭,调查原因(通过 cause.isInitiatedByApplication()
/cause.getReason()
)并执行必要的操作。
问题
问题是在向它发布消息之前确保 AMQP 交换存在的最佳方法是什么?为什么?
IME,选项 #1 接近于更好的选项。
我发现最有效的方法是将给定发布者的代码封装为允许我在第一次创建对象实例时声明/重新声明队列的方式。然后我可以重新使用同一个对象实例来发布消息,而不必再次重新声明交换。
如果我创建对象的新实例,它会重新声明交换。但是重复使用同一个实例可以防止这种情况发生。
这个策略对我很有效。
另一种选择是在应用程序启动时预先定义和预先声明您的交换、队列和绑定。这样你就不用再担心了。这里的缺点是,您必须预先确定所有交换、队列和绑定...这在某些应用程序中有效,但在其他应用程序中无效。
经过一番调查后,我选择了选项 #3(发布时捕获异常)并结合 Publisher Confirms,这是跟踪未成功发布的消息所必需的。
问题
一个 Java 应用程序使用 RabbitMQ 和客户端库 com.rabbitmq:amqp-client
连接到它。应用程序在初始化期间声明一个 AMQP 交换器,并定期向它发布消息。
如果该交换由于某种原因被删除,应用程序将无法向其发布消息,并且 AMQP 通道会被库自动关闭。因此,任何后续发布(即使在重新创建交换之后)都会失败,并出现以下异常:
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'logs' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)
如何保证交易所在发布前确实存在?我看到以下选项。
可能的解决方案
1。尝试在每次显式发布之前重新声明交换
由于 exchangeDeclare
是幂等的,如果交换已经到位则无效,我可以在任何发布之前明确声明交换:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message);
这段代码的问题在于它看起来很愚蠢,因为大多数时候交换已经就位,而声明只是多余的。
此外,如果交换恰好在声明和发布之间被删除,我仍然有麻烦。
2。通过 exchangeDeclarePassive
验证交换确实存在
我可以使用 exchangeDeclarePassive
在发布之前检查交换是否存在,但以下明显的方法不起作用:
private static void ensureExchangeExists(Channel channel) throws IOException {
try {
channel.exchangeDeclarePassive(EXCHANGE_NAME);
} catch (Exception e) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false, true, null);
}
}
问题是,如果缺少exchange,exchangeDeclarePassive
会抛出异常,通道会被库自动关闭。所以 catch 块中的代码不能声明交换(因为它试图在关闭的通道上执行操作)。
所以我不能再使用单个 AMQP 通道,必须以某种方式管理它们。
如果在声明和发布之间删除交换,我仍然有麻烦。
3。发布时捕获异常
你不能只用 try/catch 块包装调用 channel.basicPublish 因为如果缺少交换则不会抛出异常。
但是您可以注册一个 ShutdownListener
,它能够检测通道何时关闭,调查原因(通过 cause.isInitiatedByApplication()
/cause.getReason()
)并执行必要的操作。
问题
问题是在向它发布消息之前确保 AMQP 交换存在的最佳方法是什么?为什么?
IME,选项 #1 接近于更好的选项。
我发现最有效的方法是将给定发布者的代码封装为允许我在第一次创建对象实例时声明/重新声明队列的方式。然后我可以重新使用同一个对象实例来发布消息,而不必再次重新声明交换。
如果我创建对象的新实例,它会重新声明交换。但是重复使用同一个实例可以防止这种情况发生。
这个策略对我很有效。
另一种选择是在应用程序启动时预先定义和预先声明您的交换、队列和绑定。这样你就不用再担心了。这里的缺点是,您必须预先确定所有交换、队列和绑定...这在某些应用程序中有效,但在其他应用程序中无效。
经过一番调查后,我选择了选项 #3(发布时捕获异常)并结合 Publisher Confirms,这是跟踪未成功发布的消息所必需的。