Camel 3:如何使用 interceptSendToEndpoint 从 `onException` 拦截路由
Camel 3: How to intercept a route from `onException` using `interceptSendToEndpoint`
问题:
在从 Camel 2 迁移到 3 的过程中,我的错误路由测试失败了。
我遵循的模式是强制异常并断言 onException()
块使用适当的标签发送到我的指标路由。
我正在使用 uri 模式匹配来单独测试每个标记是否已发出...这对测试模式有很大影响
注意:在下面的两个示例中,createRouteBuilder()
方法是相同的
路过骆驼 2 示例
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel2Test : CamelTestSupport() {
val startUri = "direct:start"
val baseMetricsUri = "micrometer:counter:errors"
// Want to use pattern to test each individual tag here
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(fullMetricsUri)
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `metric with tag B is emitted`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
context.getRouteDefinition(startUri)
.adviceWith(context, object : RouteBuilder() {
override fun configure() {
interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
})
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}
失败的 Camel 3 示例
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel3Test : CamelTestSupport() {
val startUri = "direct:start"
val baseMetricsUri = "micrometer:counter:errors"
// Want to use pattern to test each individual tag here
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(fullMetricsUri)
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `metric with tag B is emitted`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}
mockEndpoint
未接收到交换,而是仍在转到指标端点。
问题:
在 Camel 3 中,我怎样才能像在 Camel 2 中那样拦截一条路线使用模式?手动测试显示错误路由在 prod 中的行为符合预期,因此这似乎是一个测试配置问题。
其他详情:
- This unit test from the camel repo 演示了我正在尝试做的事情,但是通过手动拦截路线而不是直接在路线中使用
mock:
。
当我不需要模式匹配时,这种替代方法就有效
override fun isMockEndpointsAndSkip() = myUri
// ... in test
getMockEndpoint("mock:$myUri").expectedMessageCount(1)
首先,非常感谢您提供了一个结构良好的问题和适当的代码示例!模拟组件的手册提到了一项功能的介绍 'Mocking Existing Endpoints' 很可能这就是阻止您的原因。我不太确定哪个版本的 Camel 引入了这个特性。
无论如何,要绕过当前的限制,您可以使用自动模拟功能本身。您的测试方法可以更改如下以使其正常工作。
@Test
fun `exception is routed to error logging route`() {
val exchange = createExchangeWithBody("")
// Create new mock endpoint that will replace our error route
val mockEndpoint = getMockEndpoint("mock:$errorUri")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.mockEndpoints(errorUri)
routeBuilder.interceptSendToEndpoint(errorUri)
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied()
}
对原始代码进行了两处更改。
- 模拟端点已从
mock:test
重命名,以符合自动生成的模拟端点类型 (mock:direct:errors
)
- 调用
routeBuilder.mockEndpoints(errorUri)
以便 camel 可以自动注入 Mocks,用于 errorUri
描述的模式
除此之外,还可以替换下面的方块
routeBuilder.mockEndpoints(errorUri)
routeBuilder.interceptSendToEndpoint(errorUri)
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
只有一个衬里 routeBuilder.mockEndpointsAndSkip(errorUri)
,除非有特定的理由使用 intercept
,正如您在问题中提到的那样。
补充观察:
运行 您的代码,没有任何更改,清楚地显示了 Mock 端点中的 RouteReifier
挂钩,mock://test
代替了 direct:errors
。此外,context
似乎也有一个合适的 endpointStrategy
。
这可能是一个错误。尽管有简单的替代方法,但请考虑在 ASF Jira 上也提出这个问题。
14:32:34.307 [main] INFO org.apache.camel.reifier.RouteReifier - Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
<from uri="direct:start"/>
<onException>
<exception>java.lang.Exception</exception>
<to uri="direct:errors"/>
</onException>
<throwException/>
</route>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
<from uri="direct:start"/>
<onException>
<exception>java.lang.Exception</exception>
<to uri="direct:errors"/>
</onException>
<interceptSendToEndpoint skipSendToOriginalEndpoint="true" uri="direct:errors">
<to uri="mock://test"/>
</interceptSendToEndpoint>
<throwException/>
</route>
测试通过IDE
Java实现(如果有人需要的话)
import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Assert;
import org.junit.Test;
public class Camel3RouteTest extends CamelTestSupport {
private static final String startUri = "direct:start";
private static final String errorUri = "direct:errors";
private static final String mockErrorURI = "mock:"+ errorUri;
private static final String ERROR_MESSAGE = "ERROR MESSAGE!";
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class)
.to(errorUri);
from(errorUri)
.routeId(errorUri)
.log("error happened!");
from(startUri)
.routeId(startUri)
.throwException(new Exception(ERROR_MESSAGE));
}
};
}
@Test
public void testExecution() throws Exception {
AdviceWithRouteBuilder.adviceWith(context, startUri, adviceWithRouteBuilder -> {
//a.mockEndpointsAndSkip(errorUri);
adviceWithRouteBuilder.mockEndpoints(errorUri);
adviceWithRouteBuilder.interceptSendToEndpoint(errorUri).skipSendToOriginalEndpoint().to(mockErrorURI);
});
MockEndpoint mockEndpoint = getMockEndpoint(mockErrorURI);
mockEndpoint.setExpectedMessageCount(1);
context.start();
sendBody(startUri, "A Test message");
assertMockEndpointsSatisfied();
Assert.assertNotNull(mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
Exception receivedException = (Exception) mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT);
Assert.assertTrue(receivedException instanceof Exception);
Assert.assertEquals(receivedException.getMessage(), ERROR_MESSAGE);
}
}
@ShellDragon 回答的一些额外信息。在调试您的示例期间,我发现了一件有趣的事情。您的示例在 camel 3 中不起作用,因为 SendProcessor 丢失了部分代码(doStart 方法):
// the destination could since have been intercepted by a interceptSendToEndpoint so we got to
// lookup this before we can use the destination
Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
if (lookup instanceof InterceptSendToEndpoint) {
if (log.isDebugEnabled()) {
log.debug("Intercepted sending to {} -> {}",
URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
}
destination = lookup;
}
2.x 中的目标 "direct:errors" 已被创建的拦截端点重写。
但是现在这段代码被标记为 "old cruft" 并被 @clausibsen 删除了。我怀疑这是一个错误,因为简单的 interceptSendToEndpoint 仍然有效。也许在使用 advicewith + 拦截器方面有变化。
这似乎与onException()
的使用有直接关系。显然在 Camel 3 中你不能再直接从 onException
进行拦截,所以将业务逻辑从异常块中移出到一个新的路由中允许拦截工作。
在我的例子中,这只需要在交换属性中保存相关的 onException
信息,然后在发出指标时可以引用这些信息。
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel3ErrorInterceptWorking : CamelTestSupport() {
val startUri = "direct:start"
val errorUri = "direct:errors"
val baseMetricsUri = "micrometer:counter:errors"
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(errorUri)
from(errorUri)
.to(fullMetricsUri) // Moved metrics here from `onException`
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `exception is routed to error logging route`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}
问题:
在从 Camel 2 迁移到 3 的过程中,我的错误路由测试失败了。
我遵循的模式是强制异常并断言 onException()
块使用适当的标签发送到我的指标路由。
我正在使用 uri 模式匹配来单独测试每个标记是否已发出...这对测试模式有很大影响
注意:在下面的两个示例中,createRouteBuilder()
方法是相同的
路过骆驼 2 示例
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel2Test : CamelTestSupport() {
val startUri = "direct:start"
val baseMetricsUri = "micrometer:counter:errors"
// Want to use pattern to test each individual tag here
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(fullMetricsUri)
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `metric with tag B is emitted`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
context.getRouteDefinition(startUri)
.adviceWith(context, object : RouteBuilder() {
override fun configure() {
interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
})
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}
失败的 Camel 3 示例
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel3Test : CamelTestSupport() {
val startUri = "direct:start"
val baseMetricsUri = "micrometer:counter:errors"
// Want to use pattern to test each individual tag here
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(fullMetricsUri)
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `metric with tag B is emitted`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}
mockEndpoint
未接收到交换,而是仍在转到指标端点。
问题:
在 Camel 3 中,我怎样才能像在 Camel 2 中那样拦截一条路线使用模式?手动测试显示错误路由在 prod 中的行为符合预期,因此这似乎是一个测试配置问题。
其他详情:
- This unit test from the camel repo 演示了我正在尝试做的事情,但是通过手动拦截路线而不是直接在路线中使用
mock:
。 当我不需要模式匹配时,这种替代方法就有效
override fun isMockEndpointsAndSkip() = myUri // ... in test getMockEndpoint("mock:$myUri").expectedMessageCount(1)
首先,非常感谢您提供了一个结构良好的问题和适当的代码示例!模拟组件的手册提到了一项功能的介绍 'Mocking Existing Endpoints' 很可能这就是阻止您的原因。我不太确定哪个版本的 Camel 引入了这个特性。
无论如何,要绕过当前的限制,您可以使用自动模拟功能本身。您的测试方法可以更改如下以使其正常工作。
@Test
fun `exception is routed to error logging route`() {
val exchange = createExchangeWithBody("")
// Create new mock endpoint that will replace our error route
val mockEndpoint = getMockEndpoint("mock:$errorUri")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.mockEndpoints(errorUri)
routeBuilder.interceptSendToEndpoint(errorUri)
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied()
}
对原始代码进行了两处更改。
- 模拟端点已从
mock:test
重命名,以符合自动生成的模拟端点类型 (mock:direct:errors
) - 调用
routeBuilder.mockEndpoints(errorUri)
以便 camel 可以自动注入 Mocks,用于errorUri
描述的模式
除此之外,还可以替换下面的方块
routeBuilder.mockEndpoints(errorUri)
routeBuilder.interceptSendToEndpoint(errorUri)
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
只有一个衬里 routeBuilder.mockEndpointsAndSkip(errorUri)
,除非有特定的理由使用 intercept
,正如您在问题中提到的那样。
补充观察:
运行 您的代码,没有任何更改,清楚地显示了 Mock 端点中的 RouteReifier
挂钩,mock://test
代替了 direct:errors
。此外,context
似乎也有一个合适的 endpointStrategy
。
这可能是一个错误。尽管有简单的替代方法,但请考虑在 ASF Jira 上也提出这个问题。
14:32:34.307 [main] INFO org.apache.camel.reifier.RouteReifier - Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
<from uri="direct:start"/>
<onException>
<exception>java.lang.Exception</exception>
<to uri="direct:errors"/>
</onException>
<throwException/>
</route>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
<from uri="direct:start"/>
<onException>
<exception>java.lang.Exception</exception>
<to uri="direct:errors"/>
</onException>
<interceptSendToEndpoint skipSendToOriginalEndpoint="true" uri="direct:errors">
<to uri="mock://test"/>
</interceptSendToEndpoint>
<throwException/>
</route>
测试通过IDE
Java实现(如果有人需要的话)
import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Assert;
import org.junit.Test;
public class Camel3RouteTest extends CamelTestSupport {
private static final String startUri = "direct:start";
private static final String errorUri = "direct:errors";
private static final String mockErrorURI = "mock:"+ errorUri;
private static final String ERROR_MESSAGE = "ERROR MESSAGE!";
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class)
.to(errorUri);
from(errorUri)
.routeId(errorUri)
.log("error happened!");
from(startUri)
.routeId(startUri)
.throwException(new Exception(ERROR_MESSAGE));
}
};
}
@Test
public void testExecution() throws Exception {
AdviceWithRouteBuilder.adviceWith(context, startUri, adviceWithRouteBuilder -> {
//a.mockEndpointsAndSkip(errorUri);
adviceWithRouteBuilder.mockEndpoints(errorUri);
adviceWithRouteBuilder.interceptSendToEndpoint(errorUri).skipSendToOriginalEndpoint().to(mockErrorURI);
});
MockEndpoint mockEndpoint = getMockEndpoint(mockErrorURI);
mockEndpoint.setExpectedMessageCount(1);
context.start();
sendBody(startUri, "A Test message");
assertMockEndpointsSatisfied();
Assert.assertNotNull(mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
Exception receivedException = (Exception) mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT);
Assert.assertTrue(receivedException instanceof Exception);
Assert.assertEquals(receivedException.getMessage(), ERROR_MESSAGE);
}
}
@ShellDragon 回答的一些额外信息。在调试您的示例期间,我发现了一件有趣的事情。您的示例在 camel 3 中不起作用,因为 SendProcessor 丢失了部分代码(doStart 方法):
// the destination could since have been intercepted by a interceptSendToEndpoint so we got to
// lookup this before we can use the destination
Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
if (lookup instanceof InterceptSendToEndpoint) {
if (log.isDebugEnabled()) {
log.debug("Intercepted sending to {} -> {}",
URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
}
destination = lookup;
}
2.x 中的目标 "direct:errors" 已被创建的拦截端点重写。 但是现在这段代码被标记为 "old cruft" 并被 @clausibsen 删除了。我怀疑这是一个错误,因为简单的 interceptSendToEndpoint 仍然有效。也许在使用 advicewith + 拦截器方面有变化。
这似乎与onException()
的使用有直接关系。显然在 Camel 3 中你不能再直接从 onException
进行拦截,所以将业务逻辑从异常块中移出到一个新的路由中允许拦截工作。
在我的例子中,这只需要在交换属性中保存相关的 onException
信息,然后在发出指标时可以引用这些信息。
import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit
class Camel3ErrorInterceptWorking : CamelTestSupport() {
val startUri = "direct:start"
val errorUri = "direct:errors"
val baseMetricsUri = "micrometer:counter:errors"
val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"
override fun isUseAdviceWith(): Boolean {
return true
}
override fun createRouteBuilder(): RoutesBuilder {
return object : RouteBuilder() {
override fun configure() {
onException(Exception::class.java)
.to(errorUri)
from(errorUri)
.to(fullMetricsUri) // Moved metrics here from `onException`
from(startUri)
.routeId(startUri)
.throwException(Exception())
}
}
}
@Test
fun `exception is routed to error logging route`() {
val exchange = createExchangeWithBody("")
val mockEndpoint = getMockEndpoint("mock:test")
AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
.skipSendToOriginalEndpoint()
.to(mockEndpoint)
}
context.start()
mockEndpoint.expectedMessageCount(1)
template.send(startUri, exchange)
assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
}
}