JEE7 + WildFly (HornetQ) - 从应用程序暂停队列
JEE7 + WildFly (HornetQ) - Pause queue from application
我们正在使用 WildFly + HornetQ 作为我们的应用程序服务器和 JMS 消息队列,并且需要能够从应用程序中 pause/resume 队列。这可能吗?
您是否正在寻找停止和开始传递消息的方法?如果是,则 JMS 定义 connection.Stop
方法来暂停消息传递。可以使用 connection.Start
方法恢复消息传递。
因此 HornetQ JMS 客户端将实现这些方法。您将需要使用这些方法。
这可以使用 JMX 或使用 hornetq 核心管理来完成 api。
出于本示例的目的,使用了 wildfly 8.1。0.Final 运行 standalone-full-ha 配置文件。
所需的 Maven 依赖项:
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-client</artifactId>
<version>2.4.1.Final</version>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-jmx</artifactId>
<version>8.1.0.Final</version>
</dependency>
这是一个测试 class 演示如何通过 JMX 使用 JmsQueueControl:
package test.jmx.hornetq;
import org.hornetq.api.jms.management.JMSQueueControl;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
public class WildflyJmsControl {
public static void main(String[] args) {
try {
//Get a connection to the WildFly 8 MBean server on localhost
String host = "localhost";
int port = 9990; // management-web port
String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
JMXServiceURL serviceURL = new JMXServiceURL(urlString);
JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
String queueName = "testQueue"; // use your queue name here
String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
ObjectName objectName = ObjectName.getInstance(mbeanObjectName);
JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
assert jmsQueueControl != null;
long msgCount = jmsQueueControl.countMessages(null);
System.out.println(mbeanObjectName + " message count: " + msgCount);
jmsQueueControl.pause();
System.out.println("queue paused");
jmsQueueControl.resume();
System.out.println("queue resumed");
jmxConnector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
要通过 JMS 访问 hornetq 管理,请使用:
package test.jms.hornetq;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
public class HornetqService {
public void testPauseResumeQueue() {
// this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
try {
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
InVMConnectorFactory.class.getName()));
ClientSession session = locator.createSessionFactory().createSession();
session.start();
ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");
String queueName = "testQueue"; // use your queue name here
// get queue message count
ClientMessage message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "messageCount");
ClientMessage reply = requester.request(message);
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");
// pause the queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "pause");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result = ManagementHelper.getResult(reply);
System.out.println("result: " + result.getClass().getName() + " : " + result.toString());
// resume queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "resume");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result2 = ManagementHelper.getResult(reply);
System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());
requester.close();
session.close();
}catch (Exception e){
System.out.println("Error pausing queue" + e.getMessage());
}
}
}
我们正在使用 WildFly + HornetQ 作为我们的应用程序服务器和 JMS 消息队列,并且需要能够从应用程序中 pause/resume 队列。这可能吗?
您是否正在寻找停止和开始传递消息的方法?如果是,则 JMS 定义 connection.Stop
方法来暂停消息传递。可以使用 connection.Start
方法恢复消息传递。
因此 HornetQ JMS 客户端将实现这些方法。您将需要使用这些方法。
这可以使用 JMX 或使用 hornetq 核心管理来完成 api。
出于本示例的目的,使用了 wildfly 8.1。0.Final 运行 standalone-full-ha 配置文件。
所需的 Maven 依赖项:
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-client</artifactId>
<version>2.4.1.Final</version>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-jmx</artifactId>
<version>8.1.0.Final</version>
</dependency>
这是一个测试 class 演示如何通过 JMX 使用 JmsQueueControl:
package test.jmx.hornetq;
import org.hornetq.api.jms.management.JMSQueueControl;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
public class WildflyJmsControl {
public static void main(String[] args) {
try {
//Get a connection to the WildFly 8 MBean server on localhost
String host = "localhost";
int port = 9990; // management-web port
String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
JMXServiceURL serviceURL = new JMXServiceURL(urlString);
JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
String queueName = "testQueue"; // use your queue name here
String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
ObjectName objectName = ObjectName.getInstance(mbeanObjectName);
JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
assert jmsQueueControl != null;
long msgCount = jmsQueueControl.countMessages(null);
System.out.println(mbeanObjectName + " message count: " + msgCount);
jmsQueueControl.pause();
System.out.println("queue paused");
jmsQueueControl.resume();
System.out.println("queue resumed");
jmxConnector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
要通过 JMS 访问 hornetq 管理,请使用:
package test.jms.hornetq;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
public class HornetqService {
public void testPauseResumeQueue() {
// this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
try {
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
InVMConnectorFactory.class.getName()));
ClientSession session = locator.createSessionFactory().createSession();
session.start();
ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");
String queueName = "testQueue"; // use your queue name here
// get queue message count
ClientMessage message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "messageCount");
ClientMessage reply = requester.request(message);
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");
// pause the queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "pause");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result = ManagementHelper.getResult(reply);
System.out.println("result: " + result.getClass().getName() + " : " + result.toString());
// resume queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "resume");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result2 = ManagementHelper.getResult(reply);
System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());
requester.close();
session.close();
}catch (Exception e){
System.out.println("Error pausing queue" + e.getMessage());
}
}
}