如何获取 java 中使用 activeMQ 创建的队列处理的消息数?
How to get number of messages processed by queue created with activeMQ in java?
我们使用 apache camel 创建了路由,其中我们使用 activeMQ 从队列发布和订阅消息
请在下面找到使用的 activeMQ 配置
<beanid="activemqT3"class="org.apache.activemq.camel.component.ActiveMQComponent">
<propertyname="configuration"ref="jmsConfigT3"/>
<propertyname="transacted"value="true"/>
</bean>
<beanid="jmsConfigT3"class="org.apache.camel.component.jms.JmsConfiguration">
<propertyname="cacheLevelName"value="CACHE_CONSUMER"/>
<propertyname="connectionFactory"ref="pooledConnectionFactoryT3"/>
<propertyname="concurrentConsumers"value="10"/>
<propertyname="receiveTimeout"value="60000"/>
<propertyname="transacted"value="true"/>
</bean>
<beanid="pooledConnectionFactoryT3"class="org.apache.activemq.pool.PooledConnectionFactory"init-method="start"destroy-method="stop">
<propertyname="maxConnections"value="8"/>
<propertyname="connectionFactory"ref="jmsConnectionFactoryT3"/>
</bean>
<beanid="jmsConnectionFactoryT3"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="${com.integration.amq.brokerURL}"/>
<propertyname="userName"value="${com.integration.activemq.username}"/>
<propertyname="password"value="${com.integration.activemq.password}"/>
<propertyname="prefetchPolicy"ref="prefetchPolicyT3"/>
<propertyname="redeliveryPolicy">
<beanclass="org.apache.activemq.RedeliveryPolicy">
<propertyname="maximumRedeliveries"value="0"/>
</bean>
</property>
</bean>
<beanid="prefetchPolicyT3"class="org.apache.activemq.ActiveMQPrefetchPolicy">
<propertyname="queuePrefetch"value="0"/>
</bean>
现在使用独立的 java 程序,我们想知道每个 queue.Can 有多少条消息 published/subscribed 请告诉我示例 program/example
尝试了以下 java 代码
字符串 url =
"service:jmx:rmi:///jndi/rmi://abcdefdv302.attpmerd.com:1099/jmxrmi";
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(url));
MBeanServerConnection connection = connector.getMBeanServerConnection();
// get queue size
ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=abcdefdv302.attpmerd.com,destinationType=Queue,destinationName=tmp.incoming.master");
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers, DestinationViewMBean.class, true);
long queueSize = mbView.getQueueSize();
System.out.println(queueSize);
低于异常
Exception in thread "main" java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.NameNotFoundException: jmxrmi
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:268)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:227)
at amd.esb.amq.App.main(App.java:43)
Caused by: javax.naming.NameNotFoundException: jmxrmi
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:116)
at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:203)
at javax.naming.InitialContext.lookup(InitialContext.java:411)
at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1936)
at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1903)
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:286)
使用jolokia后出错
Exception in thread "main" org.jolokia.client.exception.J4pConnectException: Cannot connect to http://atlesbdv02.amd.com:8161/api/jolokia: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
at org.jolokia.client.J4pClient.mapException(J4pClient.java:325)
at org.jolokia.client.J4pClient.execute(J4pClient.java:198)
at org.jolokia.client.J4pClient.execute(J4pClient.java:168)
at org.jolokia.client.J4pClient.execute(J4pClient.java:117)
at com.mycompany.camel.activemq.ActiveMQClient.getNumberOfConsumedMessages(ActiveMQClient.java:33)
at com.mycompany.camel.activemq.ActiveMQClient.main(ActiveMQClient.java:19)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:140)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:314)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:363)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:219)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:195)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:86)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:108)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at org.jolokia.client.J4pClient.execute(J4pClient.java:190)
... 4 more
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:72)
at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:117)
... 15 more
这是一个使用 jolokia rest 接口的示例,我发现与 JMX 相比,它更容易使用。
首先检查 jolokia api 是否可用,转到 http://localhost:8161/api/jolokia。 (将 localhost 替换为您经纪人的主机名)您应该会看到一个 json 响应,其中包含一些经纪人详细信息。
将以下依赖项添加到您的 Maven 项目中:
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-client-java</artifactId>
<version>1.3.4</version>
</dependency>
然后使用下面的代码获取队列中(取消)排队消息的数量。
import org.jolokia.client.J4pClient;
import org.jolokia.client.exception.J4pException;
import org.jolokia.client.request.J4pReadRequest;
import org.jolokia.client.request.J4pResponse;
import javax.management.MalformedObjectNameException;
public class ActiveMQClient {
private J4pClient j4pClient;
private String brokerName;
public static void main(String[] args) throws MalformedObjectNameException, J4pException {
ActiveMQClient activeMQClient = new ActiveMQClient("localhost", "user", "password", "localhost");
System.out.println(activeMQClient.getNumberOfConsumedMessages("yourQueue"));
System.out.println(activeMQClient.getNumberOfEnqueuedMessages("yourQueue"));
}
public ActiveMQClient(String host, String user, String password, String brokerName) {
this.brokerName = brokerName;
j4pClient = J4pClient.url("http://" + host + ":8161/api/jolokia")
.user(user)
.password(password)
.build();
}
public Long getNumberOfConsumedMessages(String queueName) throws MalformedObjectNameException, J4pException {
J4pReadRequest j4pReadRequest = new J4pReadRequest("org.apache.activemq:brokerName=" + brokerName + ",destinationName=" + queueName + ",destinationType=Queue,type=Broker", "DequeueCount");
J4pResponse<J4pReadRequest> response = j4pClient.execute(j4pReadRequest);
return response.getValue();
}
public Long getNumberOfEnqueuedMessages(String queueName) throws MalformedObjectNameException, J4pException {
J4pReadRequest j4pReadRequest = new J4pReadRequest("org.apache.activemq:brokerName=" + brokerName + ",destinationName=" + queueName + ",destinationType=Queue,type=Broker", "EnqueueCount");
J4pResponse<J4pReadRequest> response = j4pClient.execute(j4pReadRequest);
return response.getValue();
}
}
我们可以使用 jconsole - java java 的管理控制台来获取队列的所有属性,包括处理的消息数。
有关 jconsole
的更多详细信息,请参阅下文 link
http://activemq.apache.org/jmx.html
https://docs.oracle.com/javase/7/docs/technotes/guides/management/jconsole.html
使用以下代码使用 jconsole 和 jmx 获取队列计数 url
JMXConnector connector;
jmxURL = service:jmx:rmi:///jndi/rmi://hostname:1099/karaf-root;
String[] userCredentialsArray = new String[2];
userCredentialsArray[0] = xxx;
userCredentialsArray[1] = xxx;
queueName=test;
connector = JMXConnectorFactory.connect(new JMXServiceURL(jmxURL),credentialsMap);
LOGGER.info("JMX connection established " + connector);
MBeanServerConnection connection = connector.getMBeanServerConnection();
LOGGER.info("MBeanServerConnection connection established "+ connection);
ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName="+ queueName);
LOGGER.info("ObjectName created " + nameConsumers);
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers,DestinationViewMBean.class, true);
System.out.println("count - "+mbView.getQueueSize() );
我们使用 apache camel 创建了路由,其中我们使用 activeMQ 从队列发布和订阅消息
请在下面找到使用的 activeMQ 配置
<beanid="activemqT3"class="org.apache.activemq.camel.component.ActiveMQComponent">
<propertyname="configuration"ref="jmsConfigT3"/>
<propertyname="transacted"value="true"/>
</bean>
<beanid="jmsConfigT3"class="org.apache.camel.component.jms.JmsConfiguration">
<propertyname="cacheLevelName"value="CACHE_CONSUMER"/>
<propertyname="connectionFactory"ref="pooledConnectionFactoryT3"/>
<propertyname="concurrentConsumers"value="10"/>
<propertyname="receiveTimeout"value="60000"/>
<propertyname="transacted"value="true"/>
</bean>
<beanid="pooledConnectionFactoryT3"class="org.apache.activemq.pool.PooledConnectionFactory"init-method="start"destroy-method="stop">
<propertyname="maxConnections"value="8"/>
<propertyname="connectionFactory"ref="jmsConnectionFactoryT3"/>
</bean>
<beanid="jmsConnectionFactoryT3"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="${com.integration.amq.brokerURL}"/>
<propertyname="userName"value="${com.integration.activemq.username}"/>
<propertyname="password"value="${com.integration.activemq.password}"/>
<propertyname="prefetchPolicy"ref="prefetchPolicyT3"/>
<propertyname="redeliveryPolicy">
<beanclass="org.apache.activemq.RedeliveryPolicy">
<propertyname="maximumRedeliveries"value="0"/>
</bean>
</property>
</bean>
<beanid="prefetchPolicyT3"class="org.apache.activemq.ActiveMQPrefetchPolicy">
<propertyname="queuePrefetch"value="0"/>
</bean>
现在使用独立的 java 程序,我们想知道每个 queue.Can 有多少条消息 published/subscribed 请告诉我示例 program/example
尝试了以下 java 代码 字符串 url =
"service:jmx:rmi:///jndi/rmi://abcdefdv302.attpmerd.com:1099/jmxrmi";
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(url));
MBeanServerConnection connection = connector.getMBeanServerConnection();
// get queue size
ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=abcdefdv302.attpmerd.com,destinationType=Queue,destinationName=tmp.incoming.master");
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers, DestinationViewMBean.class, true);
long queueSize = mbView.getQueueSize();
System.out.println(queueSize);
低于异常
Exception in thread "main" java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.NameNotFoundException: jmxrmi
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:268)
at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:227)
at amd.esb.amq.App.main(App.java:43)
Caused by: javax.naming.NameNotFoundException: jmxrmi
at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:116)
at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:203)
at javax.naming.InitialContext.lookup(InitialContext.java:411)
at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1936)
at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1903)
at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:286)
使用jolokia后出错
Exception in thread "main" org.jolokia.client.exception.J4pConnectException: Cannot connect to http://atlesbdv02.amd.com:8161/api/jolokia: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
at org.jolokia.client.J4pClient.mapException(J4pClient.java:325)
at org.jolokia.client.J4pClient.execute(J4pClient.java:198)
at org.jolokia.client.J4pClient.execute(J4pClient.java:168)
at org.jolokia.client.J4pClient.execute(J4pClient.java:117)
at com.mycompany.camel.activemq.ActiveMQClient.getNumberOfConsumedMessages(ActiveMQClient.java:33)
at com.mycompany.camel.activemq.ActiveMQClient.main(ActiveMQClient.java:19)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:140)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:314)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:363)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:219)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:195)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:86)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:108)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at org.jolokia.client.J4pClient.execute(J4pClient.java:190)
... 4 more
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:72)
at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:117)
... 15 more
这是一个使用 jolokia rest 接口的示例,我发现与 JMX 相比,它更容易使用。
首先检查 jolokia api 是否可用,转到 http://localhost:8161/api/jolokia。 (将 localhost 替换为您经纪人的主机名)您应该会看到一个 json 响应,其中包含一些经纪人详细信息。
将以下依赖项添加到您的 Maven 项目中:
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-client-java</artifactId>
<version>1.3.4</version>
</dependency>
然后使用下面的代码获取队列中(取消)排队消息的数量。
import org.jolokia.client.J4pClient;
import org.jolokia.client.exception.J4pException;
import org.jolokia.client.request.J4pReadRequest;
import org.jolokia.client.request.J4pResponse;
import javax.management.MalformedObjectNameException;
public class ActiveMQClient {
private J4pClient j4pClient;
private String brokerName;
public static void main(String[] args) throws MalformedObjectNameException, J4pException {
ActiveMQClient activeMQClient = new ActiveMQClient("localhost", "user", "password", "localhost");
System.out.println(activeMQClient.getNumberOfConsumedMessages("yourQueue"));
System.out.println(activeMQClient.getNumberOfEnqueuedMessages("yourQueue"));
}
public ActiveMQClient(String host, String user, String password, String brokerName) {
this.brokerName = brokerName;
j4pClient = J4pClient.url("http://" + host + ":8161/api/jolokia")
.user(user)
.password(password)
.build();
}
public Long getNumberOfConsumedMessages(String queueName) throws MalformedObjectNameException, J4pException {
J4pReadRequest j4pReadRequest = new J4pReadRequest("org.apache.activemq:brokerName=" + brokerName + ",destinationName=" + queueName + ",destinationType=Queue,type=Broker", "DequeueCount");
J4pResponse<J4pReadRequest> response = j4pClient.execute(j4pReadRequest);
return response.getValue();
}
public Long getNumberOfEnqueuedMessages(String queueName) throws MalformedObjectNameException, J4pException {
J4pReadRequest j4pReadRequest = new J4pReadRequest("org.apache.activemq:brokerName=" + brokerName + ",destinationName=" + queueName + ",destinationType=Queue,type=Broker", "EnqueueCount");
J4pResponse<J4pReadRequest> response = j4pClient.execute(j4pReadRequest);
return response.getValue();
}
}
我们可以使用 jconsole - java java 的管理控制台来获取队列的所有属性,包括处理的消息数。 有关 jconsole
的更多详细信息,请参阅下文 linkhttp://activemq.apache.org/jmx.html
https://docs.oracle.com/javase/7/docs/technotes/guides/management/jconsole.html
使用以下代码使用 jconsole 和 jmx 获取队列计数 url
JMXConnector connector;
jmxURL = service:jmx:rmi:///jndi/rmi://hostname:1099/karaf-root;
String[] userCredentialsArray = new String[2];
userCredentialsArray[0] = xxx;
userCredentialsArray[1] = xxx;
queueName=test;
connector = JMXConnectorFactory.connect(new JMXServiceURL(jmxURL),credentialsMap);
LOGGER.info("JMX connection established " + connector);
MBeanServerConnection connection = connector.getMBeanServerConnection();
LOGGER.info("MBeanServerConnection connection established "+ connection);
ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName="+ queueName);
LOGGER.info("ObjectName created " + nameConsumers);
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers,DestinationViewMBean.class, true);
System.out.println("count - "+mbView.getQueueSize() );