Java 监控 activemq 但不轮询队列

Java monitoring activemq but without polling the queue

我需要编写一些程序来监视 java 中的 activemq 队列。这意味着我需要记录消息何时入队以及何时出队。我的程序一定不能发消息,也不能接收消息,只需要记录即可。

我发现推送消息和接收消息,但这不是我想要做的,只是记录外部进程是否将消息放入队列或从队列中取出消息。

为了更清楚我画了一张图

我使用 apache camel 进行集成, 我的 routebuilder 看起来像

public void configure() throws Exception {
        Processor queueProcessor = new QueueProcessor();

        from("activemq:queue:KBC").process(queueProcessor);
    }

它调用followwing处理器

@Override
    public void process(Exchange exchange) throws Exception {
        Trax_EventDao dao = new Trax_EventDao();
        dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
    }

dao 处理数据库连接并插入一条记录

实际问题是,当我将一条消息推送到队列并且程序运行时,消息被记录下来,这没问题,但它也会立即被轮询,这是不对的。 如何在不轮询消息的情况下进行插入?

您可以使用 ActiveMQ 咨询消息来监控队列 activity...

http://activemq.apache.org/advisory-message.html

我最后做的是编写一个自己的运行器 class,它使用一个队列浏览器。

我想用这个 class 做的是

  1. 与 avtivemq 建立连接并启动它
  2. 做一个死循环,控制指定的队列。我有队列中的项目列表。在每个循环中我检查这个
  3. 如果列表大于队列的大小,则有项目出队。这意味着我需要循环它并检查哪些项目已出队。 否则我循环队列的枚举并将元素添加到列表中(如果它们尚不存在)

    package queueFeed;
    
    import dao.ProcmonDao;
    import dao.EventDao;
    import domain.Event;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    
    import javax.jms.*;
    import java.sql.SQLException;
    import java.sql.Time;
    import java.util.*;
    
    public class QueueRunner {
        private ProcmonDao dao;
        private Connection connection;
        private String queueName;
    
        public QueueRunner() throws SQLException {
            dao = new EventDao();
        }
    
    public void setConnection(String username, String password, String url) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
        connection = factory.createConnection();
    }
    
    public void run() throws Exception {
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName));
    
        List<String> ids = new ArrayList<>();
        int queueSize = 0;
        int counter = 0;
        connection.start();
    
        while (true) {
            Enumeration enumeration = browser.getEnumeration();
            if (queueSize < ids.size()) {
                while (enumeration.hasMoreElements()) {
                    Message message = (Message) enumeration.nextElement();
                    ids.remove(message.getJMSMessageID());
                    counter++;
                }
    
                if (ids.size() > 0 && ids.size() > 0) {
                    Iterator<String> iterator = ids.iterator();
                    while (iterator.hasNext()) {
                        String messageId = iterator.next();
                        dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                        iterator.remove();
                    }
                }
    
                queueSize = counter;
                counter = 0;
            } else {
    
                while (enumeration.hasMoreElements()) {
                    counter++;
                    Message message = (Message) enumeration.nextElement();
                    String id = message.getJMSMessageID();
                    if (!ids.contains(id)) {
                        ids.add(id);
                        dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                    }
                }
                queueSize = counter;
                counter = 0;
            }
        }
    }
    
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    
    public String getQueueName() {
        return this.queueName;
    }
    

    }

这还不够完美。我觉得里面有个小逻辑问题