onMessage 调用后 ActiveMQ 消费者内存使用量不断增加
ActiveMQ consumer memory usage keep increasing after onMessage call
我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但消费者的记忆持续上升,过程时间越长 运行s。
生产者将消息标记为NON_PERSISTENT
并每秒发送大约30次。每条消息都是一个字节消息,大约 3000 字节。
Producer.cpp
void Producer::send_message(uint8_t* pointer, size_t size) {
auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
producer->send(msg.get());
}
void Producer::run() {
try {
std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
connection->start();
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
Consumer.cpp
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
const auto data = msg->getBodyBytes();
const auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
void Consumer::run()
{
try {
std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
if (amqConnection != nullptr) {
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
consumer.reset(session->createConsumer(destination.get()));
consumer->setMessageListener(this);
}
catch (cms::CMSException& e) {
e.printStackTrace();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
}
然后,我调用了 Consumer
:
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
consumer.run();
while (1) {}
consumer.close();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
Consumer
能够接收和处理消息。但是,Consumer
的内存一直在上升。 10 分钟后内存约为 200MB 运行。在 CMS 概述中,他们提到传递给 onMessage
的指针由调用拥有,所以我不应该尝试删除它。然而,调用者似乎从未删除消息,这使得内存不断增加。
有什么方法可以在每次 onMessage
调用后释放消息的内存?
非常感谢您的宝贵时间和帮助。
我想通了。
getBodyBytes()
return 一个指向我应该在调用后清理的数组的指针。所以我只需要将它包装在 std::unique_ptr
中以便正确清理它。
onMessage()
应该是这样的:
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
std::unique_ptr<unsigned char> data(msg->getBodyBytes());
auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但消费者的记忆持续上升,过程时间越长 运行s。
生产者将消息标记为NON_PERSISTENT
并每秒发送大约30次。每条消息都是一个字节消息,大约 3000 字节。
Producer.cpp
void Producer::send_message(uint8_t* pointer, size_t size) {
auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
producer->send(msg.get());
}
void Producer::run() {
try {
std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
connection->start();
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
Consumer.cpp
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
const auto data = msg->getBodyBytes();
const auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
void Consumer::run()
{
try {
std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
if (amqConnection != nullptr) {
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
consumer.reset(session->createConsumer(destination.get()));
consumer->setMessageListener(this);
}
catch (cms::CMSException& e) {
e.printStackTrace();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
}
然后,我调用了 Consumer
:
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
consumer.run();
while (1) {}
consumer.close();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
Consumer
能够接收和处理消息。但是,Consumer
的内存一直在上升。 10 分钟后内存约为 200MB 运行。在 CMS 概述中,他们提到传递给 onMessage
的指针由调用拥有,所以我不应该尝试删除它。然而,调用者似乎从未删除消息,这使得内存不断增加。
有什么方法可以在每次 onMessage
调用后释放消息的内存?
非常感谢您的宝贵时间和帮助。
我想通了。
getBodyBytes()
return 一个指向我应该在调用后清理的数组的指针。所以我只需要将它包装在 std::unique_ptr
中以便正确清理它。
onMessage()
应该是这样的:
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
std::unique_ptr<unsigned char> data(msg->getBodyBytes());
auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}