使用特定于路由的 DLQ 配置 Java/Camel/AMQ
Configuring Java/Camel/AMQ with route-specific DLQs
Java 8/骆驼 2.19.x/AMQ 5.15.x 这里.
我有一个 Java 应用程序,它使用 Camel 来消费来自 AMQ 队列的消息,处理这些消息,并用它们做一些事情。有时路由的输出是将处理结果备份到另一个队列以供进一步下游处理,而不是always/necessarily。典型的 Java/Camel/AMQ 设置。
我的每个路由(我使用的是 Camel XML DSL)都有一个已配置的 <onException>
处理程序,通常如下所示:
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy logStackTrace="true"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.stacktrace}" loggingLevel="ERROR"/>
<rollback markRollbackOnly="true"/>
</onException>
非常简单:记录异常并回滚。
我想做的是,作为这个 <onException>
处理程序的一部分,放置 原始消息 (失败并导致抛出异常, 不是 异常!)在特定于路由的 DLQ 上("DLQ" 我的意思是只是一个可以发送失败消息的队列auditing/reporting/playback 目的)
意思是,如果我的应用程序有 30 条路由,每条路由都从 30 个不同的 AMQ 队列中消费,我将有 30 个不同的 "DLQs",它们各自的 <onException>
处理程序会将失败的消息发送到这些队列。
理想情况下,我希望此配置位于 AMQ 端(同样可能在 activem.xml
或类似的内部),这样如果 DLQ 目标需要,我就不需要更改代码或重新部署改变。但是,如果它只能从 Camel 内部进行 route/configs,那也没关系。
我假设我可以修改每个路由以包含其自己的原始消息的自定义目标 DLQ:
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy logStackTrace="true"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.stacktrace}" loggingLevel="ERROR"/>
<rollback markRollbackOnly="true"/>
<to uri="activemq:fizzbuzz.dlq"/>
</onException>
但我希望有比这更优雅的东西...
有什么办法可以做到这一点吗?
可能这种方式更适合你:
DeadLetterChannelBuilder errorHandlerBuilder = deadLetterChannel("jms:dummy");
errorHandlerBuilder.onPrepareFailure(exchange -> {
exchange.getIn().setHeader("CamelJmsDestinationName",exchange.getIn().getHeader("JMSDestination",String.class).concat(".DLQ"));
});
from("jms:input1")
.to("seda:process");
from("jms:input2")
.to("seda:process");
from("jms:input3")
.to("seda:process");
from("seda:process").errorHandler(errorHandlerBuilder)
.process(exchange -> {
throw new RuntimeException();
});
您可以在运行时计算 DLQ 队列名称。 DeadLetterChannel Builder 也可以像您的 onException 一样配置。
如果您想在 ActiveMQ 本身中进行配置,您还可以添加特定的 policyEntry
并在 deadLetterStrategy
中使用通配符,如本例所示:
为所有 INBOUND 队列创建专用 DLQ:
<policyEntry queue="*.INBOUND.>">
<deadLetterStrategy>
<individualDeadLetterStrategy processExpired="false" queuePrefix="" queueSuffix=".DLQ" useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
在那种情况下,我在所有入站队列中捕获了消费者的错误:
XXX.INBOUND.AAA -> XXX.INBOUND.AAA.DLQ
YYY.INBOUND -> YYY.INBOUND.DLQ
ZZZ.INBOUND.BBB.CCC -> ZZZ.INBOUND.BBB.CCC.DLQ
但是
NNN.MMM.INBOUND -> ActiveMQ.DLQ
因为模式 *
与 NNN.MMM
.
中的点 .
字符不匹配
您可以根据您的用例调整模式。
这对于管理未正确捕获异常的消费者非常有用,这是在 ActiveMQ 级别完成的。
Java 8/骆驼 2.19.x/AMQ 5.15.x 这里.
我有一个 Java 应用程序,它使用 Camel 来消费来自 AMQ 队列的消息,处理这些消息,并用它们做一些事情。有时路由的输出是将处理结果备份到另一个队列以供进一步下游处理,而不是always/necessarily。典型的 Java/Camel/AMQ 设置。
我的每个路由(我使用的是 Camel XML DSL)都有一个已配置的 <onException>
处理程序,通常如下所示:
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy logStackTrace="true"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.stacktrace}" loggingLevel="ERROR"/>
<rollback markRollbackOnly="true"/>
</onException>
非常简单:记录异常并回滚。
我想做的是,作为这个 <onException>
处理程序的一部分,放置 原始消息 (失败并导致抛出异常, 不是 异常!)在特定于路由的 DLQ 上("DLQ" 我的意思是只是一个可以发送失败消息的队列auditing/reporting/playback 目的)
意思是,如果我的应用程序有 30 条路由,每条路由都从 30 个不同的 AMQ 队列中消费,我将有 30 个不同的 "DLQs",它们各自的 <onException>
处理程序会将失败的消息发送到这些队列。
理想情况下,我希望此配置位于 AMQ 端(同样可能在 activem.xml
或类似的内部),这样如果 DLQ 目标需要,我就不需要更改代码或重新部署改变。但是,如果它只能从 Camel 内部进行 route/configs,那也没关系。
我假设我可以修改每个路由以包含其自己的原始消息的自定义目标 DLQ:
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy logStackTrace="true"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.stacktrace}" loggingLevel="ERROR"/>
<rollback markRollbackOnly="true"/>
<to uri="activemq:fizzbuzz.dlq"/>
</onException>
但我希望有比这更优雅的东西...
有什么办法可以做到这一点吗?
可能这种方式更适合你:
DeadLetterChannelBuilder errorHandlerBuilder = deadLetterChannel("jms:dummy");
errorHandlerBuilder.onPrepareFailure(exchange -> {
exchange.getIn().setHeader("CamelJmsDestinationName",exchange.getIn().getHeader("JMSDestination",String.class).concat(".DLQ"));
});
from("jms:input1")
.to("seda:process");
from("jms:input2")
.to("seda:process");
from("jms:input3")
.to("seda:process");
from("seda:process").errorHandler(errorHandlerBuilder)
.process(exchange -> {
throw new RuntimeException();
});
您可以在运行时计算 DLQ 队列名称。 DeadLetterChannel Builder 也可以像您的 onException 一样配置。
如果您想在 ActiveMQ 本身中进行配置,您还可以添加特定的 policyEntry
并在 deadLetterStrategy
中使用通配符,如本例所示:
为所有 INBOUND 队列创建专用 DLQ:
<policyEntry queue="*.INBOUND.>">
<deadLetterStrategy>
<individualDeadLetterStrategy processExpired="false" queuePrefix="" queueSuffix=".DLQ" useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
在那种情况下,我在所有入站队列中捕获了消费者的错误:
XXX.INBOUND.AAA -> XXX.INBOUND.AAA.DLQ
YYY.INBOUND -> YYY.INBOUND.DLQ
ZZZ.INBOUND.BBB.CCC -> ZZZ.INBOUND.BBB.CCC.DLQ
但是
NNN.MMM.INBOUND -> ActiveMQ.DLQ
因为模式 *
与 NNN.MMM
.
.
字符不匹配
您可以根据您的用例调整模式。
这对于管理未正确捕获异常的消费者非常有用,这是在 ActiveMQ 级别完成的。