队列中没有消息时停止 Camel 路由(jms、seda)
Stopping Camel route when no messages in queue (jms, seda)
我有一个 Camel 路由,可以将消息从一个 jms 队列移动到另一个队列。该路由默认停止,它通过使用 controlBus 调用码头路由启动。
由于我需要按需移动消息,一旦源 jms 队列为空,我需要禁用“mover”路由,以便在再次激活“mover”路由之前不会处理稍后到达的消息.
有办法实现吗?
您可以尝试使用 JmsTemplate
获取队列消息计数,然后使用处理器关闭来自另一个线程的路由。缺点可能是对 org.springframework.jms.core.JmsTemplate
的依赖以及与之相关的一些 gotchas。
package com.example;
import java.util.Collections;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
public class QueueConsumerTests extends CamelTestSupport {
@Rule
public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
@Test
public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {
MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
jmsMockEndpoint.expectedMessageCount(5);
for (int i = 1; i <= 5; i++) {
template.sendBody("direct:test", "Message " + i);
}
context().getRouteController().startRoute("queueListener");
Thread.sleep(5000);
ServiceStatus routeStatus = context().getRouteStatus("queueListener");
assertEquals(routeStatus, ServiceStatus.Stopped);
jmsMockEndpoint.assertIsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.to("jms:queue:test");
from("jms:queue:test")
.routeId("queueListener")
.autoStartup(false)
.log("message from queue: ${body}")
.to("mock:jmsMockEndpoint")
.setBody().exchange(e -> {
int messageCount = jmsTemplate.browse("test", (session, browser) -> {
return Collections.list(browser.getEnumeration()).size();
});
return messageCount;
})
.filter(body().isEqualTo(0))
.to("seda:stopPolling")
.end();
from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
.log("stop polling")
.process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
.setProperty("stopped").constant(false)
.loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
.delay(100)
.setProperty("stopped").exchange(e -> {
return e.getContext().getRouteStatus("queueListener").isStopped();
})
.end()
.log("stopped queueListener");
}
};
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
context.addComponent("jms", jmsComponent);
return context;
}
}
备选方案可以是使用 timer
和 poll-enrich
并在超时时关闭路由,如果它导致空主体。但是,由于您需要指定轮询频率和超时,因此速度较慢且可能不太可靠。
package com.example;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;
public class QueueConsumerTests2 extends CamelTestSupport {
@Rule
public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
@Test
public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {
MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
jmsMockEndpoint.expectedMessageCount(5);
for (int i = 1; i <= 5; i++) {
template.sendBody("direct:test", "Message " + i);
}
context().getRouteController().startRoute("queueListener");
Thread.sleep(10000);
ServiceStatus routeStatus = context().getRouteStatus("queueListener");
assertEquals(routeStatus, ServiceStatus.Stopped);
jmsMockEndpoint.assertIsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.to("jms:queue:test");
from("timer:moverTimer?period=500")
.routeId("queueListener")
.autoStartup(false)
.pollEnrich("jms:queue:test", 1000)
.choice()
.when(body().isNotNull())
.log("message from queue: ${body}")
.to("mock:jmsMockEndpoint")
.otherwise()
.to("seda:stopPolling")
.end();
from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
.log("stop polling")
.process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
.setProperty("stopped").constant(false)
.loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
.delay(100)
.setProperty("stopped").exchange(e -> {
return e.getContext().getRouteStatus("queueListener").isStopped();
})
.end()
.log("stopped queueListener");
}
};
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
context.addComponent("jms", jmsComponent);
return context;
}
}
免责声明 尚未彻底测试这些模式,因此可能存在一些边缘情况(以及更好的方法)。还有一个事实是,Camel documentation 使用 java.lang.Thread
来停止处理器内部的路由,而不是使用 seda
消费者,因此其中可能有些东西。
在单元测试中使用 Thread.sleep
也很混乱,我不推荐这样做,除非你只想用 camel 快速试验一些东西。
使用的依赖项:
- org.apache.camel/camel-core/2.24.2
- org.apache.camel/camel-jms/2.24.2
测试范围:
- org.apache.camel/camel-test/2.24.2
- org.apache.activemq.tooling/activemq-junit/5.16.3
- org.apache.activemq/activemq-broker/5.16.3
我有一个 Camel 路由,可以将消息从一个 jms 队列移动到另一个队列。该路由默认停止,它通过使用 controlBus 调用码头路由启动。
由于我需要按需移动消息,一旦源 jms 队列为空,我需要禁用“mover”路由,以便在再次激活“mover”路由之前不会处理稍后到达的消息.
有办法实现吗?
您可以尝试使用 JmsTemplate
获取队列消息计数,然后使用处理器关闭来自另一个线程的路由。缺点可能是对 org.springframework.jms.core.JmsTemplate
的依赖以及与之相关的一些 gotchas。
package com.example;
import java.util.Collections;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
public class QueueConsumerTests extends CamelTestSupport {
@Rule
public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
@Test
public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {
MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
jmsMockEndpoint.expectedMessageCount(5);
for (int i = 1; i <= 5; i++) {
template.sendBody("direct:test", "Message " + i);
}
context().getRouteController().startRoute("queueListener");
Thread.sleep(5000);
ServiceStatus routeStatus = context().getRouteStatus("queueListener");
assertEquals(routeStatus, ServiceStatus.Stopped);
jmsMockEndpoint.assertIsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.to("jms:queue:test");
from("jms:queue:test")
.routeId("queueListener")
.autoStartup(false)
.log("message from queue: ${body}")
.to("mock:jmsMockEndpoint")
.setBody().exchange(e -> {
int messageCount = jmsTemplate.browse("test", (session, browser) -> {
return Collections.list(browser.getEnumeration()).size();
});
return messageCount;
})
.filter(body().isEqualTo(0))
.to("seda:stopPolling")
.end();
from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
.log("stop polling")
.process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
.setProperty("stopped").constant(false)
.loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
.delay(100)
.setProperty("stopped").exchange(e -> {
return e.getContext().getRouteStatus("queueListener").isStopped();
})
.end()
.log("stopped queueListener");
}
};
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
context.addComponent("jms", jmsComponent);
return context;
}
}
备选方案可以是使用 timer
和 poll-enrich
并在超时时关闭路由,如果它导致空主体。但是,由于您需要指定轮询频率和超时,因此速度较慢且可能不太可靠。
package com.example;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Rule;
import org.junit.Test;
public class QueueConsumerTests2 extends CamelTestSupport {
@Rule
public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
@Test
public void stopesQueueListenerRouteAfterConsumingAllMessages() throws Exception {
MockEndpoint jmsMockEndpoint = getMockEndpoint("mock:jmsMockEndpoint");
jmsMockEndpoint.expectedMessageCount(5);
for (int i = 1; i <= 5; i++) {
template.sendBody("direct:test", "Message " + i);
}
context().getRouteController().startRoute("queueListener");
Thread.sleep(10000);
ServiceStatus routeStatus = context().getRouteStatus("queueListener");
assertEquals(routeStatus, ServiceStatus.Stopped);
jmsMockEndpoint.assertIsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.to("jms:queue:test");
from("timer:moverTimer?period=500")
.routeId("queueListener")
.autoStartup(false)
.pollEnrich("jms:queue:test", 1000)
.choice()
.when(body().isNotNull())
.log("message from queue: ${body}")
.to("mock:jmsMockEndpoint")
.otherwise()
.to("seda:stopPolling")
.end();
from("seda:stopPolling?concurrentConsumers=1&multipleConsumers=false")
.log("stop polling")
.process(e -> e.getContext().getRouteController().stopRoute("queueListener"))
.setProperty("stopped").constant(false)
.loopDoWhile(exchangeProperty("stopped").isEqualTo(false))
.delay(100)
.setProperty("stopped").exchange(e -> {
return e.getContext().getRouteStatus("queueListener").isStopped();
})
.end()
.log("stopped queueListener");
}
};
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = new DefaultCamelContext();
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
context.addComponent("jms", jmsComponent);
return context;
}
}
免责声明 尚未彻底测试这些模式,因此可能存在一些边缘情况(以及更好的方法)。还有一个事实是,Camel documentation 使用 java.lang.Thread
来停止处理器内部的路由,而不是使用 seda
消费者,因此其中可能有些东西。
在单元测试中使用 Thread.sleep
也很混乱,我不推荐这样做,除非你只想用 camel 快速试验一些东西。
使用的依赖项:
- org.apache.camel/camel-core/2.24.2
- org.apache.camel/camel-jms/2.24.2
测试范围:
- org.apache.camel/camel-test/2.24.2
- org.apache.activemq.tooling/activemq-junit/5.16.3
- org.apache.activemq/activemq-broker/5.16.3