Java 监控 activemq 但不轮询队列
Java monitoring activemq but without polling the queue
我需要编写一些程序来监视 java 中的 activemq 队列。这意味着我需要记录消息何时入队以及何时出队。我的程序一定不能发消息,也不能接收消息,只需要记录即可。
我发现推送消息和接收消息,但这不是我想要做的,只是记录外部进程是否将消息放入队列或从队列中取出消息。
为了更清楚我画了一张图
我使用 apache camel 进行集成,
我的 routebuilder 看起来像
public void configure() throws Exception {
Processor queueProcessor = new QueueProcessor();
from("activemq:queue:KBC").process(queueProcessor);
}
它调用followwing处理器
@Override
public void process(Exchange exchange) throws Exception {
Trax_EventDao dao = new Trax_EventDao();
dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
}
dao 处理数据库连接并插入一条记录
实际问题是,当我将一条消息推送到队列并且程序运行时,消息被记录下来,这没问题,但它也会立即被轮询,这是不对的。
如何在不轮询消息的情况下进行插入?
您可以使用 ActiveMQ 咨询消息来监控队列 activity...
我最后做的是编写一个自己的运行器 class,它使用一个队列浏览器。
我想用这个 class 做的是
- 与 avtivemq 建立连接并启动它
- 做一个死循环,控制指定的队列。我有队列中的项目列表。在每个循环中我检查这个
如果列表大于队列的大小,则有项目出队。这意味着我需要循环它并检查哪些项目已出队。
否则我循环队列的枚举并将元素添加到列表中(如果它们尚不存在)
package queueFeed;
import dao.ProcmonDao;
import dao.EventDao;
import domain.Event;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
import java.sql.SQLException;
import java.sql.Time;
import java.util.*;
public class QueueRunner {
private ProcmonDao dao;
private Connection connection;
private String queueName;
public QueueRunner() throws SQLException {
dao = new EventDao();
}
public void setConnection(String username, String password, String url) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
connection = factory.createConnection();
}
public void run() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName));
List<String> ids = new ArrayList<>();
int queueSize = 0;
int counter = 0;
connection.start();
while (true) {
Enumeration enumeration = browser.getEnumeration();
if (queueSize < ids.size()) {
while (enumeration.hasMoreElements()) {
Message message = (Message) enumeration.nextElement();
ids.remove(message.getJMSMessageID());
counter++;
}
if (ids.size() > 0 && ids.size() > 0) {
Iterator<String> iterator = ids.iterator();
while (iterator.hasNext()) {
String messageId = iterator.next();
dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
iterator.remove();
}
}
queueSize = counter;
counter = 0;
} else {
while (enumeration.hasMoreElements()) {
counter++;
Message message = (Message) enumeration.nextElement();
String id = message.getJMSMessageID();
if (!ids.contains(id)) {
ids.add(id);
dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
}
}
queueSize = counter;
counter = 0;
}
}
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getQueueName() {
return this.queueName;
}
}
这还不够完美。我觉得里面有个小逻辑问题
我需要编写一些程序来监视 java 中的 activemq 队列。这意味着我需要记录消息何时入队以及何时出队。我的程序一定不能发消息,也不能接收消息,只需要记录即可。
我发现推送消息和接收消息,但这不是我想要做的,只是记录外部进程是否将消息放入队列或从队列中取出消息。
为了更清楚我画了一张图
我使用 apache camel 进行集成, 我的 routebuilder 看起来像
public void configure() throws Exception {
Processor queueProcessor = new QueueProcessor();
from("activemq:queue:KBC").process(queueProcessor);
}
它调用followwing处理器
@Override
public void process(Exchange exchange) throws Exception {
Trax_EventDao dao = new Trax_EventDao();
dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
}
dao 处理数据库连接并插入一条记录
实际问题是,当我将一条消息推送到队列并且程序运行时,消息被记录下来,这没问题,但它也会立即被轮询,这是不对的。 如何在不轮询消息的情况下进行插入?
您可以使用 ActiveMQ 咨询消息来监控队列 activity...
我最后做的是编写一个自己的运行器 class,它使用一个队列浏览器。
我想用这个 class 做的是
- 与 avtivemq 建立连接并启动它
- 做一个死循环,控制指定的队列。我有队列中的项目列表。在每个循环中我检查这个
如果列表大于队列的大小,则有项目出队。这意味着我需要循环它并检查哪些项目已出队。 否则我循环队列的枚举并将元素添加到列表中(如果它们尚不存在)
package queueFeed; import dao.ProcmonDao; import dao.EventDao; import domain.Event; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import javax.jms.*; import java.sql.SQLException; import java.sql.Time; import java.util.*; public class QueueRunner { private ProcmonDao dao; private Connection connection; private String queueName; public QueueRunner() throws SQLException { dao = new EventDao(); } public void setConnection(String username, String password, String url) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url); connection = factory.createConnection(); } public void run() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName)); List<String> ids = new ArrayList<>(); int queueSize = 0; int counter = 0; connection.start(); while (true) { Enumeration enumeration = browser.getEnumeration(); if (queueSize < ids.size()) { while (enumeration.hasMoreElements()) { Message message = (Message) enumeration.nextElement(); ids.remove(message.getJMSMessageID()); counter++; } if (ids.size() > 0 && ids.size() > 0) { Iterator<String> iterator = ids.iterator(); while (iterator.hasNext()) { String messageId = iterator.next(); dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime()))); iterator.remove(); } } queueSize = counter; counter = 0; } else { while (enumeration.hasMoreElements()) { counter++; Message message = (Message) enumeration.nextElement(); String id = message.getJMSMessageID(); if (!ids.contains(id)) { ids.add(id); dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime()))); } } queueSize = counter; counter = 0; } } } public void setQueueName(String queueName) { this.queueName = queueName; } public String getQueueName() { return this.queueName; }
}
这还不够完美。我觉得里面有个小逻辑问题