Camel 3:如何使用 interceptSendToEndpoint 从 `onException` 拦截路由

Camel 3: How to intercept a route from `onException` using `interceptSendToEndpoint`

问题:

在从 Camel 2 迁移到 3 的过程中,我的错误路由测试失败了。

我遵循的模式是强制异常并断言 onException() 块使用适当的标签发送到我的指标路由。

我正在使用 uri 模式匹配来单独测试每个标记是否已发出...这对测试模式有很大影响

注意:在下面的两个示例中,createRouteBuilder()方法是相同的

路过骆驼 2 示例

import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit

class Camel2Test : CamelTestSupport() {

    val startUri = "direct:start"
    val baseMetricsUri = "micrometer:counter:errors"
    // Want to use pattern to test each individual tag here
    val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"

    override fun isUseAdviceWith(): Boolean {
        return true
    }

    override fun createRouteBuilder(): RoutesBuilder {
        return object : RouteBuilder() {
            override fun configure() {

                onException(Exception::class.java)
                    .to(fullMetricsUri)

                from(startUri)
                    .routeId(startUri)
                    .throwException(Exception())
            }

        }
    }

    @Test
    fun `metric with tag B is emitted`() {
        val exchange = createExchangeWithBody("")

        val mockEndpoint = getMockEndpoint("mock:test")

        context.getRouteDefinition(startUri)
            .adviceWith(context, object : RouteBuilder() {
                override fun configure() {
                    interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
                        .skipSendToOriginalEndpoint()
                        .to(mockEndpoint)
                }
            })

        context.start()

        mockEndpoint.expectedMessageCount(1)

        template.send(startUri, exchange)

        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
    }
}

失败的 Camel 3 示例

import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit

class Camel3Test : CamelTestSupport() {

    val startUri = "direct:start"
    val baseMetricsUri = "micrometer:counter:errors"
    // Want to use pattern to test each individual tag here
    val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"

    override fun isUseAdviceWith(): Boolean {
        return true
    }

    override fun createRouteBuilder(): RoutesBuilder {
        return object : RouteBuilder() {
            override fun configure() {

                onException(Exception::class.java)
                    .to(fullMetricsUri)

                from(startUri)
                    .routeId(startUri)
                    .throwException(Exception())
            }

        }
    }

    @Test
    fun `metric with tag B is emitted`() {
        val exchange = createExchangeWithBody("")

        val mockEndpoint = getMockEndpoint("mock:test")

        AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
            routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
                .skipSendToOriginalEndpoint()
                .to(mockEndpoint)
        }

        context.start()

        mockEndpoint.expectedMessageCount(1)

        template.send(startUri, exchange)

        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
    }
}

mockEndpoint 未接收到交换,而是仍在转到指标端点。

问题:

在 Camel 3 中,我怎样才能像在 Camel 2 中那样拦截一条路线使用模式?手动测试显示错误路由在 prod 中的行为符合预期,因此这似乎是一个测试配置问题。

其他详情:

首先,非常感谢您提供了一个结构良好的问题和适当的代码示例!模拟组件的手册提到了一项功能的介绍 'Mocking Existing Endpoints' 很可能这就是阻止您的原因。我不太确定哪个版本的 Camel 引入了这个特性。

无论如何,要绕过当前的限制,您可以使用自动模拟功能本身。您的测试方法可以更改如下以使其正常工作。

 @Test
    fun `exception is routed to error logging route`() {
        val exchange = createExchangeWithBody("")

        // Create new mock endpoint that will replace our error route
        val mockEndpoint = getMockEndpoint("mock:$errorUri") 

        AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
            routeBuilder.mockEndpoints(errorUri) 
            routeBuilder.interceptSendToEndpoint(errorUri)
                    .skipSendToOriginalEndpoint()
                    .to(mockEndpoint)
        }

        context.start()

        mockEndpoint.expectedMessageCount(1)

        template.send(startUri, exchange)

        assertMockEndpointsSatisfied()
    }

对原始代码进行了两处更改。

  1. 模拟端点已从 mock:test 重命名,以符合自动生成的模拟端点类型 (mock:direct:errors)
  2. 调用 routeBuilder.mockEndpoints(errorUri) 以便 camel 可以自动注入 Mocks,用于 errorUri
  3. 描述的模式

除此之外,还可以替换下面的方块

  routeBuilder.mockEndpoints(errorUri)
  routeBuilder.interceptSendToEndpoint(errorUri)
          .skipSendToOriginalEndpoint()
          .to(mockEndpoint)

只有一个衬里 routeBuilder.mockEndpointsAndSkip(errorUri),除非有特定的理由使用 intercept,正如您在问题中提到的那样。

补充观察:

运行 您的代码,没有任何更改,清楚地显示了 Mock 端点中的 RouteReifier 挂钩,mock://test 代替了 direct:errors。此外,context 似乎也有一个合适的 endpointStrategy

这可能是一个错误。尽管有简单的替代方法,但请考虑在 ASF Jira 上也提出这个问题。

14:32:34.307 [main] INFO org.apache.camel.reifier.RouteReifier - Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
    <from uri="direct:start"/>
    <onException>
        <exception>java.lang.Exception</exception>
        <to uri="direct:errors"/>
    </onException>
    <throwException/>
</route>

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="direct:start">
    <from uri="direct:start"/>
    <onException>
        <exception>java.lang.Exception</exception>
        <to uri="direct:errors"/>
    </onException>
    <interceptSendToEndpoint skipSendToOriginalEndpoint="true" uri="direct:errors">
        <to uri="mock://test"/>
    </interceptSendToEndpoint>
    <throwException/>
</route>

测试通过IDE

Java实现(如果有人需要的话)


import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Assert;
import org.junit.Test;

public class Camel3RouteTest extends CamelTestSupport {

    private static final String startUri = "direct:start";
    private static final String errorUri = "direct:errors";
    private static final String mockErrorURI = "mock:"+ errorUri;
    private static final String ERROR_MESSAGE = "ERROR MESSAGE!";

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                onException(Exception.class)
                        .to(errorUri);

                from(errorUri)
                        .routeId(errorUri)
                        .log("error happened!");

                from(startUri)
                        .routeId(startUri)
                        .throwException(new Exception(ERROR_MESSAGE));

            }
        };
    }

    @Test
    public void testExecution() throws Exception {

        AdviceWithRouteBuilder.adviceWith(context, startUri, adviceWithRouteBuilder -> {
            //a.mockEndpointsAndSkip(errorUri);

            adviceWithRouteBuilder.mockEndpoints(errorUri);
            adviceWithRouteBuilder.interceptSendToEndpoint(errorUri).skipSendToOriginalEndpoint().to(mockErrorURI);
        });

        MockEndpoint mockEndpoint = getMockEndpoint(mockErrorURI);
        mockEndpoint.setExpectedMessageCount(1);

        context.start();
        sendBody(startUri, "A Test message");
        assertMockEndpointsSatisfied();

        Assert.assertNotNull(mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
        Exception receivedException = (Exception) mockEndpoint.getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT);

        Assert.assertTrue(receivedException instanceof Exception);
        Assert.assertEquals(receivedException.getMessage(), ERROR_MESSAGE);


    }


}

@ShellDragon 回答的一些额外信息。在调试您的示例期间,我发现了一件有趣的事情。您的示例在 camel 3 中不起作用,因为 SendProcessor 丢失了部分代码(doStart 方法):

// the destination could since have been intercepted by a interceptSendToEndpoint so we got to
    // lookup this before we can use the destination
    Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
    if (lookup instanceof InterceptSendToEndpoint) {
        if (log.isDebugEnabled()) {
            log.debug("Intercepted sending to {} -> {}",
                    URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
        }
        destination = lookup;
    }

2.x 中的目标 "direct:errors" 已被创建的拦截端点重写。 但是现在这段代码被标记为 "old cruft" 并被 @clausibsen 删除了。我怀疑这是一个错误,因为简单的 interceptSendToEndpoint 仍然有效。也许在使用 advicewith + 拦截器方面有变化。

这似乎与onException()的使用有直接关系。显然在 Camel 3 中你不能再直接从 onException 进行拦截,所以将业务逻辑从异常块中移出到一个新的路由中允许拦截工作。

在我的例子中,这只需要在交换属性中保存相关的 onException 信息,然后在发出指标时可以引用这些信息。

import org.apache.camel.RoutesBuilder
import org.apache.camel.builder.AdviceWithRouteBuilder
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.test.junit4.CamelTestSupport
import org.junit.Test
import java.util.concurrent.TimeUnit

class Camel3ErrorInterceptWorking : CamelTestSupport() {

    val startUri = "direct:start"
    val errorUri = "direct:errors"
    val baseMetricsUri = "micrometer:counter:errors"
    val fullMetricsUri = "$baseMetricsUri?tags=a=1,b=2"

    override fun isUseAdviceWith(): Boolean {
        return true
    }

    override fun createRouteBuilder(): RoutesBuilder {
        return object : RouteBuilder() {
            override fun configure() {

                onException(Exception::class.java)
                    .to(errorUri)

                from(errorUri)
                    .to(fullMetricsUri) // Moved metrics here from `onException`

                from(startUri)
                    .routeId(startUri)
                    .throwException(Exception())
            }

        }
    }

    @Test
    fun `exception is routed to error logging route`() {
        val exchange = createExchangeWithBody("")

        val mockEndpoint = getMockEndpoint("mock:test")

        AdviceWithRouteBuilder.adviceWith(context, startUri) { routeBuilder ->
            routeBuilder.interceptSendToEndpoint("$baseMetricsUri.*b.*2.*") // <-- PATTERN
                .skipSendToOriginalEndpoint()
                .to(mockEndpoint)
        }

        context.start()

        mockEndpoint.expectedMessageCount(1)

        template.send(startUri, exchange)

        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS)
    }
}