骆驼作为 JMS 到 JMS 的桥梁 - 不显示为 AMQ 消费者
Camel as a JMS to JMS bridge - Not showing as an AMQ consumer
我是 JMS 的新手,需要在 AMQ 和 WMQ 之间建立桥接,我在 Camel 文档中看到他们建议使用 Camel 而不是使用 JMS 到 JMS 桥接。
首先,我试图让我的应用程序从 AMQ 中获取消息并简单地记录它正在这样做,但是每当我在 Jetty 中启动我的应用程序时,它都不会在 apiToTopsQueue 上显示为消费者,并且因此不会从队列中删除消息。
我的应用程序-context.xml(加载骆驼上下文)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:tops-bridge.properties" />
<import resource="classpath:camel-context.xml" />
<bean id="log4jInitialization"
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetClass" value="org.springframework.util.Log4jConfigurer" />
<property name="targetMethod" value="initLogging" />
<property name="arguments">
<list>
<value>classpath:log4j.xml</value>
<value>60000</value> <!-- Refresh Log4j config every 60 seconds -->
</list>
</property>
</bean>
</beans>
那么这就是骆驼-context.xml(我一直在评论教程中的随机代码,所以它可能看起来很奇怪)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<camel:camelContext id="defaultCamelContext">
<camel:routeBuilder ref="bridgeRouteConfig"/>
<!--<camel:jmxAgent id="agent" createConnector="true"/>-->
</camel:camelContext>
<bean id="bridgeRouteConfig" class="com.caci.asg.rail.tops.bridge.TopsBridgeRouteBuilder">
<constructor-arg name="amqToBridgeQueue" value="${topsBridgeRouteBuilder.route.amqToBridgeRoute}"/>
<constructor-arg name="bridgeToWmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToWmqRoute}"/>
<constructor-arg name="wmqToBridgeQueue" value="${topsBridgeRouteBuilder.route.wmqToBridgeRoute}"/>
<constructor-arg name="bridgeToAmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToAmqRoute}"/>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${application.activemq.url}"/>
<property name="useAsyncSend" value="true"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="10"/>
</bean>
<!-- lets configure the ActiveMQ JMS broker server -->
<broker:broker useJmx="true" persistent="false" brokerName="myBroker">
<broker:transportConnectors>
<!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
<broker:transportConnector name="vm" uri="vm://myBroker"/>
<!-- expose a TCP transport for clients to use -->
<broker:transportConnector name="tcp" uri="${application.activemq.url}"/>
</broker:transportConnectors>
</broker:broker>
<!-- lets configure the Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker"/>
</bean>
</beans>
camel-context.xml 使用了如下一些属性(在我让 AMQ 工作之前,引用 wmq 的那些是占位符)。
application.activemq.url=tcp://localhost:61616
topsBridgeRouteBuilder.route.amqToBridgeRoute=jms:apiToTopsQueue
topsBridgeRouteBuilder.route.bridgeToWmqRoute=mq:toWmq
topsBridgeRouteBuilder.route.wmqToBridgeRoute=mq:fromWmq
topsBridgeRouteBuilder.route.bridgeToAmqRoute=jms:topsToApiQueue
路由生成器在Java如下
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
public class TopsBridgeRouteBuilder extends RouteBuilder {
private final String amqToBridgeQueue;
private final String bridgeToWmqQueue;
private final String wmqToBridgeQueue;
private final String bridgeToAmqQueue;
public TopsBridgeRouteBuilder(String amqToBridgeQueue, String bridgeToWmqQueue, String wmqToBridgeQueue, String bridgeToAmqQueue) {
this.amqToBridgeQueue = amqToBridgeQueue;
this.bridgeToWmqQueue = bridgeToWmqQueue;
this.wmqToBridgeQueue = wmqToBridgeQueue;
this.bridgeToAmqQueue = bridgeToAmqQueue;
}
@Override
public void configure() throws Exception {
// from(amqToBridgeQueue).to(bridgeToWmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToWmqQueue);
// from(wmqToBridgeQueue).to(bridgeToAmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToAmqQueue);
from(amqToBridgeQueue).log(LoggingLevel.WARN, "Consuming message from" + amqToBridgeQueue);
}
}
所以我不太确定为什么在我启动 Jetty 时它没有被列为 apiToTopsQueue 的使用者。我的 pom 包含 AMQ/WMQ/camel 库的依赖项。依赖如下(从父pom继承version/scope)
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.jmqi</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mqjms</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>connector</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>dhbcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
我写了这个测试,看看我可以把消息从我的桥接上和从 AMQ 上取下来 - 这有效并且在 AMQ 管理页面上我可以看到 enqueued/dequeued 消息计数增加。
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import javax.jms.*;
public class TopsBridgeRouteBuilderTest {
@Test
public void testAMessageAddedToAmqCanBeRetrieved() throws JMSException {
String brokerURL = "tcp://localhost:61616";
String amqQueue = "apiToTopsQueue";
String messageToSend = "Test message";
// Put a message on the AMQ
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(amqQueue);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage tm = session.createTextMessage(messageToSend);
producer.send(tm);
// Create read only consumer to take the message off the queue
ActiveMQConnectionFactory connectionFactoryReadOnly = new ActiveMQConnectionFactory(brokerURL);
Connection connectionReadOnly = connectionFactoryReadOnly.createConnection();
connectionReadOnly.start();
Session sessionReadOnly = connectionReadOnly.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReadOnly.createConsumer(destination);
final TextMessage message = (TextMessage) consumer.receive();
System.out.println("Message retrieved = " + message.getText());
assertThat(message.getText(), is(messageToSend));
}
}
我的配置是否有问题,这意味着当我启动 Jetty 时 Camel 路由不工作或查看 AMQ?
谢谢。
原来我在 web.xml 中遗漏了这一部分。为未来吸取教训!
<listener>
<listener-class>
org.springframework.web.context.ContextLoaderListener
</listener-class>
</listener>
我是 JMS 的新手,需要在 AMQ 和 WMQ 之间建立桥接,我在 Camel 文档中看到他们建议使用 Camel 而不是使用 JMS 到 JMS 桥接。
首先,我试图让我的应用程序从 AMQ 中获取消息并简单地记录它正在这样做,但是每当我在 Jetty 中启动我的应用程序时,它都不会在 apiToTopsQueue 上显示为消费者,并且因此不会从队列中删除消息。
我的应用程序-context.xml(加载骆驼上下文)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:tops-bridge.properties" />
<import resource="classpath:camel-context.xml" />
<bean id="log4jInitialization"
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetClass" value="org.springframework.util.Log4jConfigurer" />
<property name="targetMethod" value="initLogging" />
<property name="arguments">
<list>
<value>classpath:log4j.xml</value>
<value>60000</value> <!-- Refresh Log4j config every 60 seconds -->
</list>
</property>
</bean>
</beans>
那么这就是骆驼-context.xml(我一直在评论教程中的随机代码,所以它可能看起来很奇怪)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<camel:camelContext id="defaultCamelContext">
<camel:routeBuilder ref="bridgeRouteConfig"/>
<!--<camel:jmxAgent id="agent" createConnector="true"/>-->
</camel:camelContext>
<bean id="bridgeRouteConfig" class="com.caci.asg.rail.tops.bridge.TopsBridgeRouteBuilder">
<constructor-arg name="amqToBridgeQueue" value="${topsBridgeRouteBuilder.route.amqToBridgeRoute}"/>
<constructor-arg name="bridgeToWmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToWmqRoute}"/>
<constructor-arg name="wmqToBridgeQueue" value="${topsBridgeRouteBuilder.route.wmqToBridgeRoute}"/>
<constructor-arg name="bridgeToAmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToAmqRoute}"/>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${application.activemq.url}"/>
<property name="useAsyncSend" value="true"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="10"/>
</bean>
<!-- lets configure the ActiveMQ JMS broker server -->
<broker:broker useJmx="true" persistent="false" brokerName="myBroker">
<broker:transportConnectors>
<!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
<broker:transportConnector name="vm" uri="vm://myBroker"/>
<!-- expose a TCP transport for clients to use -->
<broker:transportConnector name="tcp" uri="${application.activemq.url}"/>
</broker:transportConnectors>
</broker:broker>
<!-- lets configure the Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker"/>
</bean>
</beans>
camel-context.xml 使用了如下一些属性(在我让 AMQ 工作之前,引用 wmq 的那些是占位符)。
application.activemq.url=tcp://localhost:61616
topsBridgeRouteBuilder.route.amqToBridgeRoute=jms:apiToTopsQueue
topsBridgeRouteBuilder.route.bridgeToWmqRoute=mq:toWmq
topsBridgeRouteBuilder.route.wmqToBridgeRoute=mq:fromWmq
topsBridgeRouteBuilder.route.bridgeToAmqRoute=jms:topsToApiQueue
路由生成器在Java如下
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
public class TopsBridgeRouteBuilder extends RouteBuilder {
private final String amqToBridgeQueue;
private final String bridgeToWmqQueue;
private final String wmqToBridgeQueue;
private final String bridgeToAmqQueue;
public TopsBridgeRouteBuilder(String amqToBridgeQueue, String bridgeToWmqQueue, String wmqToBridgeQueue, String bridgeToAmqQueue) {
this.amqToBridgeQueue = amqToBridgeQueue;
this.bridgeToWmqQueue = bridgeToWmqQueue;
this.wmqToBridgeQueue = wmqToBridgeQueue;
this.bridgeToAmqQueue = bridgeToAmqQueue;
}
@Override
public void configure() throws Exception {
// from(amqToBridgeQueue).to(bridgeToWmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToWmqQueue);
// from(wmqToBridgeQueue).to(bridgeToAmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToAmqQueue);
from(amqToBridgeQueue).log(LoggingLevel.WARN, "Consuming message from" + amqToBridgeQueue);
}
}
所以我不太确定为什么在我启动 Jetty 时它没有被列为 apiToTopsQueue 的使用者。我的 pom 包含 AMQ/WMQ/camel 库的依赖项。依赖如下(从父pom继承version/scope)
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.jmqi</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mqjms</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>connector</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>dhbcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
我写了这个测试,看看我可以把消息从我的桥接上和从 AMQ 上取下来 - 这有效并且在 AMQ 管理页面上我可以看到 enqueued/dequeued 消息计数增加。
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import javax.jms.*;
public class TopsBridgeRouteBuilderTest {
@Test
public void testAMessageAddedToAmqCanBeRetrieved() throws JMSException {
String brokerURL = "tcp://localhost:61616";
String amqQueue = "apiToTopsQueue";
String messageToSend = "Test message";
// Put a message on the AMQ
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(amqQueue);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage tm = session.createTextMessage(messageToSend);
producer.send(tm);
// Create read only consumer to take the message off the queue
ActiveMQConnectionFactory connectionFactoryReadOnly = new ActiveMQConnectionFactory(brokerURL);
Connection connectionReadOnly = connectionFactoryReadOnly.createConnection();
connectionReadOnly.start();
Session sessionReadOnly = connectionReadOnly.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReadOnly.createConsumer(destination);
final TextMessage message = (TextMessage) consumer.receive();
System.out.println("Message retrieved = " + message.getText());
assertThat(message.getText(), is(messageToSend));
}
}
我的配置是否有问题,这意味着当我启动 Jetty 时 Camel 路由不工作或查看 AMQ?
谢谢。
原来我在 web.xml 中遗漏了这一部分。为未来吸取教训!
<listener>
<listener-class>
org.springframework.web.context.ContextLoaderListener
</listener-class>
</listener>