onMessage 调用后 ActiveMQ 消费者内存使用量不断增加

ActiveMQ consumer memory usage keep increasing after onMessage call

我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但消费者的记忆持续上升,过程时间越长 运行s。

生产者将消息标记为NON_PERSISTENT并每秒发送大约30次。每条消息都是一个字节消息,大约 3000 字节。

Producer.cpp

void Producer::send_message(uint8_t* pointer, size_t size) {
    auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
    producer->send(msg.get());
}

void Producer::run() {
    try {
        std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        producer.reset(session->createProducer(destination.get()));
        producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
        connection->start();
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

Consumer.cpp

void Consumer::onMessage(const cms::Message * message)
{
    try
    {
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        const auto data = msg->getBodyBytes();
        const auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) {
            // Do something with the buffer
        }
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

void Consumer::run()
{
    try {
        std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());

        std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
        if (amqConnection != nullptr) {
            amqConnection->addTransportListener(this);
        }

        connection->start();
        connection->setExceptionListener(this);
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        consumer.reset(session->createConsumer(destination.get()));
        consumer->setMessageListener(this);
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
        activemq::library::ActiveMQCPP::shutdownLibrary();
    }
}

然后,我调用了 Consumer

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
    consumer.run();

    while (1) {}

    consumer.close();
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

Consumer 能够接收和处理消息。但是,Consumer的内存一直在上升。 10 分钟后内存约为 200MB 运行。在 CMS 概述中,他们提到传递给 onMessage 的指针由调用拥有,所以我不应该尝试删除它。然而,调用者似乎从未删除消息,这使得内存不断增加。

有什么方法可以在每次 onMessage 调用后释放消息的内存?

非常感谢您的宝贵时间和帮助。

我想通了。

getBodyBytes() return 一个指向我应该在调用后清理的数组的指针。所以我只需要将它包装在 std::unique_ptr 中以便正确清理它。

onMessage() 应该是这样的:

void Consumer::onMessage(const cms::Message * message)
{
    try
    {
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        std::unique_ptr<unsigned char> data(msg->getBodyBytes());

        auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) {
            // Do something with the buffer
        }
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}