队列中没有消息时停止 Camel 路由(jms、seda)

Stopping Camel route when no messages in queue (jms, seda)

我有一个 Camel 路由,可以将消息从一个 jms 队列移动到另一个队列。该路由默认停止,它通过使用 controlBus 调用码头路由启动。

由于我需要按需移动消息,一旦源 jms 队列为空,我需要禁用“mover”路由,以便在再次激活“mover”路由之前不会处理稍后到达的消息.

有办法实现吗?

您可以尝试使用 JmsTemplate 获取队列消息计数,然后使用处理器关闭来自另一个线程的路由。缺点可能是对 org.springframework.jms.core.JmsTemplate 的依赖以及与之相关的一些 gotchas

package com.example;

import java.util.Collections;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;

public class QueueConsumerTests extends CamelTestSupport {
    
    @Rule
    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            "vm://localhost?broker.persistent=false");

    @Test
    public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {

        MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
        jmsMockEndpoint.expectedMessageCount(5);

        for (int i = 1; i <= 5; i++) {
            template.sendBody("direct:test", "Message " + i);
        }
        context().getRouteController().startRoute("queueListener");

        Thread.sleep(5000);

        ServiceStatus routeStatus = context().getRouteStatus("queueListener");
        assertEquals(routeStatus, ServiceStatus.Stopped);
        jmsMockEndpoint.assertIsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);

        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                
                from("direct:test")
                    .to("jms:queue:test");

                from("jms:queue:test")
                    .routeId("queueListener")    
                    .autoStartup(false)
                    .log("message from queue: ${body}")
                    .to("mock:jmsMockEndpoint")
                    .setBody().exchange(e -> {

                        int messageCount = jmsTemplate.browse("test", (session, browser) -> {
                            return Collections.list(browser.getEnumeration()).size();
                        });
                        return messageCount;
                    })
                    .filter(body().isEqualTo(0))
                        .to("seda:stopPolling")
                    .end();

                    from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
                        .log("stop polling")
                        .process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
                        .setProperty("stopped").constant(false)
                        .loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
                            .delay(100)
                            .setProperty("stopped").exchange(e -> { 
                                return e.getContext().getRouteStatus("queueListener").isStopped();
                            })
                        .end()
                        .log("stopped queueListener");
            }
        };
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        
        CamelContext context = new DefaultCamelContext();
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        context.addComponent("jms", jmsComponent);
        return context;
    }
}

备选方案可以是使用 timerpoll-enrich 并在超时时关闭路由,如果它导致空主体。但是,由于您需要指定轮询频率和超时,因此速度较慢且可能不太可靠。

package com.example;

import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;

public class QueueConsumerTests2 extends CamelTestSupport {
    
    @Rule
    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            "vm://localhost?broker.persistent=false");

    @Test
    public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {

        MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
        jmsMockEndpoint.expectedMessageCount(5);

        for (int i = 1; i <= 5; i++) {
            template.sendBody("direct:test", "Message " + i);
        }
        context().getRouteController().startRoute("queueListener");

        Thread.sleep(10000);

        ServiceStatus routeStatus = context().getRouteStatus("queueListener");
        assertEquals(routeStatus, ServiceStatus.Stopped);
        jmsMockEndpoint.assertIsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                
                from("direct:test")
                    .to("jms:queue:test");

                from("timer:moverTimer?period=500")
                    .routeId("queueListener")  
                    .autoStartup(false)
                    .pollEnrich("jms:queue:test", 1000)
                    .choice()
                        .when(body().isNotNull())
                            .log("message from queue: ${body}")
                            .to("mock:jmsMockEndpoint")
                        .otherwise()
                            .to("seda:stopPolling")
                    .end();

                    from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
                        .log("stop polling")
                        .process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
                        .setProperty("stopped").constant(false)
                        .loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
                            .delay(100)
                            .setProperty("stopped").exchange(e -> { 
                                return e.getContext().getRouteStatus("queueListener").isStopped();
                            })
                        .end()
                        .log("stopped queueListener");
            }
        };
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        
        CamelContext context = new DefaultCamelContext();
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        context.addComponent("jms", jmsComponent);
        return context;
    }
}

免责声明 尚未彻底测试这些模式,因此可能存在一些边缘情况(以及更好的方法)。还有一个事实是,Camel documentation 使用 java.lang.Thread 来停止处理器内部的路由,而不是使用 seda 消费者,因此其中可能有些东西。

在单元测试中使用 Thread.sleep 也很混乱,我不推荐这样做,除非你只想用 camel 快速试验一些东西。

使用的依赖项:

  • org.apache.camel/camel-core/2.24.2
  • org.apache.camel/camel-jms/2.24.2

测试范围:

  • org.apache.camel/camel-test/2.24.2
  • org.apache.activemq.tooling/activemq-junit/5.16.3
  • org.apache.activemq/activemq-broker/5.16.3