RabbitMQ CLI 状态不同于管理门户
RabbitMQ CLI status different from management portal
我注意到当我从命令行 (rabbitmqctl status
) 运行 RabbitMQ status 命令时,所有报告的数字都与我所知道的现实不符.我在管理门户网站上看到的内容证实了我的真实情况。
这是 CLI 状态的输出:
[{pid,26647},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.5.4"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.5.4"},
{webmachine,"webmachine","1.10.3-rmq3.5.4-gite9359c7"},
{mochiweb,"MochiMedia Web Server","2.7.0-rmq3.5.4-git680dba8"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.5.4"},
{rabbit,"RabbitMQ","3.5.4"},
{os_mon,"CPO CXC 138 46","2.3"},
{inets,"INETS CXC 138 49","5.10.4"},
{mnesia,"MNESIA CXC 138 12","4.12.4"},
{amqp_client,"RabbitMQ AMQP Client","3.5.4"},
{xmerl,"XML parser","1.3.7"},
{sasl,"SASL CXC 138 11","2.4.1"},
{stdlib,"ERTS CXC 138 10","2.3"},
{kernel,"ERTS CXC 138 10","3.1"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 17 [erts-6.3] [source] [64-bit] [smp:4:4] [async-threads:64] [kernel-poll:true]\n"},
{memory,
[{total,45994136},
{connection_readers,101856},
{connection_writers,54800},
{connection_channels,165968},
{connection_other,373008},
{queue_procs,175376},
{queue_slave_procs,0},
{plugins,437024},
{other_proc,13385792},
{mnesia,131904},
{mgmt_db,484216},
{msg_index,53112},
{other_ets,1119384},
{binary,3890640},
{code,20097289},
{atom,711569},
{other_system,4812198}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,787190579},
{disk_free_limit,50000000},
{disk_free,778919936},
{file_descriptors,
[{total_limit,924},
{total_used,13},
{sockets_limit,829},
{sockets_used,11}]},
{processes,[{limit,1048576},{used,350}]},
{run_queue,0},
{uptime,911}]
读者数量、作者数量、渠道数量等等,基本上每一个数字都是几千倍。
在管理门户(下面的 ss)中看到的数字是正确的。总共 10 个连接,每个连接有两个通道
我所有的队列都是非持久的,我只使用扇出交换发送非持久消息。据我了解,这应该意味着如果出现问题(这对我的需要很好),任何事情都不会持续存在。
我注意到,每当我旋转连接到代理的模块之一时,readers/writers 的数量在命令行上增加了 ~17,000,尽管只去了 up/down 1 在门户中。
这是我的代理配置代码供参考:
private String endPoint;
private int port;
private String userName;
private String password;
private Exchange publisherExchange;
private ExchangeType publisherExchangeType;
private Map<Exchange, ExchangeType> subscriptionExchanges;
private Channel publishChannel;
private Channel subscriptionChannel;
private Consumer consumer;
private BrokerHandler(BrokerHandlerBuilder builder) throws ConnectException{
this.endPoint = builder.endPoint;
this.port = builder.port;
this.userName = builder.userName;
this.password = builder.password;
this.publisherExchange = builder.publisherExchange;
this.publisherExchangeType = builder.publisherExchangeType;
this.subscriptionExchanges = builder.subscriptionExchanges;
connect();
}
private void connect() throws ConnectException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.endPoint);
factory.setPort(this.port);
if(this.userName != null && this.password != null){
factory.setUsername(this.userName);
factory.setPassword(this.password);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(RMQConstants.RABBITMQ_MAX_RETRY_DELAY);
}
try {
log.info("Registering with broker on topic " + this.publisherExchange.toString() + " on " + this.endPoint + "...");
connection = factory.newConnection();
publishChannel = connection.createChannel();
subscriptionChannel = connection.createChannel();
configureConsumer();
publishChannel.exchangeDeclare(this.publisherExchange.toString(), this.publisherExchangeType.toString());
} catch(Exception e){
throw new ConnectException("Unable to connect to RabbitMQ broker.");
}
if(this.subscriptionExchanges.size() > 0){
subscribe(this.subscriptionExchanges);
}
}
/**
* Allows callers to publish a message to the broker, which will be broadcast to all listeners using a FANOUT strategy
* @throws ConnectException if the handler is not connected to the broker
*/
private void publishToBroker(String msg) throws ConnectException{
try {
publishChannel.basicPublish(this.publisherExchange.toString(), "", null, msg.getBytes());
}
catch (IOException e) {
log.error("Unable to write message to broker.", e);
}
}
private void subscribe(Map<Exchange, ExchangeType> exchanges){
try {
String queueName = subscriptionChannel.queueDeclare().getQueue();
exchanges.forEach((k,v) -> {
try {
subscriptionChannel.exchangeDeclare(k.toString(), v.toString());
subscriptionChannel.queueBind(queueName, k.toString(), "");
} catch (Exception e) {
log.error("Error declaring exchanges for exchange: " + k.toString(), e);
}
});
subscriptionChannel.basicConsume(queueName, true, consumer);
}
catch(Exception e){
log.error("Error declaring a queue for subscription channel", e);
}
}
private void configureConsumer(){
consumer = new DefaultConsumer(subscriptionChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
handleMessage(message);
}
};
}
客户端使用构建器模式来实例化代理连接,此时他们指定他们的发布交换和他们希望订阅的任意数量的交换。该系统中总共只有 19 个交易所。
正在正确发布和接收消息,但我收到报告说代理正在使服务器停滞不前。我会更密切地监视它,但我真的很想能够从状态调用中解释这些古怪的结果。我试过停止应用程序并重置然后重新配置代理,这使连接计数回到 0,但是一旦模块开始重新连接,数字就会开始恢复。
感谢您抽出宝贵时间阅读本文。任何建议将不胜感激!
你报的connection_readers
、connection_writers
、connection_channels
等在memory
段,单位是字节,不是连接数。这与您报告的管理 UI 部分完全不同。
要通过 CLI 获取有关连接数的数据,请使用 rabbitmqctl list_connections
命令。
channels
Number of channels using the connection.
另请参阅 list_exchanges
、list_queues
和 list_consumers
。
我注意到当我从命令行 (rabbitmqctl status
) 运行 RabbitMQ status 命令时,所有报告的数字都与我所知道的现实不符.我在管理门户网站上看到的内容证实了我的真实情况。
这是 CLI 状态的输出:
[{pid,26647},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.5.4"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.5.4"},
{webmachine,"webmachine","1.10.3-rmq3.5.4-gite9359c7"},
{mochiweb,"MochiMedia Web Server","2.7.0-rmq3.5.4-git680dba8"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.5.4"},
{rabbit,"RabbitMQ","3.5.4"},
{os_mon,"CPO CXC 138 46","2.3"},
{inets,"INETS CXC 138 49","5.10.4"},
{mnesia,"MNESIA CXC 138 12","4.12.4"},
{amqp_client,"RabbitMQ AMQP Client","3.5.4"},
{xmerl,"XML parser","1.3.7"},
{sasl,"SASL CXC 138 11","2.4.1"},
{stdlib,"ERTS CXC 138 10","2.3"},
{kernel,"ERTS CXC 138 10","3.1"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 17 [erts-6.3] [source] [64-bit] [smp:4:4] [async-threads:64] [kernel-poll:true]\n"},
{memory,
[{total,45994136},
{connection_readers,101856},
{connection_writers,54800},
{connection_channels,165968},
{connection_other,373008},
{queue_procs,175376},
{queue_slave_procs,0},
{plugins,437024},
{other_proc,13385792},
{mnesia,131904},
{mgmt_db,484216},
{msg_index,53112},
{other_ets,1119384},
{binary,3890640},
{code,20097289},
{atom,711569},
{other_system,4812198}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,787190579},
{disk_free_limit,50000000},
{disk_free,778919936},
{file_descriptors,
[{total_limit,924},
{total_used,13},
{sockets_limit,829},
{sockets_used,11}]},
{processes,[{limit,1048576},{used,350}]},
{run_queue,0},
{uptime,911}]
读者数量、作者数量、渠道数量等等,基本上每一个数字都是几千倍。
在管理门户(下面的 ss)中看到的数字是正确的。总共 10 个连接,每个连接有两个通道
我所有的队列都是非持久的,我只使用扇出交换发送非持久消息。据我了解,这应该意味着如果出现问题(这对我的需要很好),任何事情都不会持续存在。
我注意到,每当我旋转连接到代理的模块之一时,readers/writers 的数量在命令行上增加了 ~17,000,尽管只去了 up/down 1 在门户中。
这是我的代理配置代码供参考:
private String endPoint;
private int port;
private String userName;
private String password;
private Exchange publisherExchange;
private ExchangeType publisherExchangeType;
private Map<Exchange, ExchangeType> subscriptionExchanges;
private Channel publishChannel;
private Channel subscriptionChannel;
private Consumer consumer;
private BrokerHandler(BrokerHandlerBuilder builder) throws ConnectException{
this.endPoint = builder.endPoint;
this.port = builder.port;
this.userName = builder.userName;
this.password = builder.password;
this.publisherExchange = builder.publisherExchange;
this.publisherExchangeType = builder.publisherExchangeType;
this.subscriptionExchanges = builder.subscriptionExchanges;
connect();
}
private void connect() throws ConnectException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.endPoint);
factory.setPort(this.port);
if(this.userName != null && this.password != null){
factory.setUsername(this.userName);
factory.setPassword(this.password);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(RMQConstants.RABBITMQ_MAX_RETRY_DELAY);
}
try {
log.info("Registering with broker on topic " + this.publisherExchange.toString() + " on " + this.endPoint + "...");
connection = factory.newConnection();
publishChannel = connection.createChannel();
subscriptionChannel = connection.createChannel();
configureConsumer();
publishChannel.exchangeDeclare(this.publisherExchange.toString(), this.publisherExchangeType.toString());
} catch(Exception e){
throw new ConnectException("Unable to connect to RabbitMQ broker.");
}
if(this.subscriptionExchanges.size() > 0){
subscribe(this.subscriptionExchanges);
}
}
/**
* Allows callers to publish a message to the broker, which will be broadcast to all listeners using a FANOUT strategy
* @throws ConnectException if the handler is not connected to the broker
*/
private void publishToBroker(String msg) throws ConnectException{
try {
publishChannel.basicPublish(this.publisherExchange.toString(), "", null, msg.getBytes());
}
catch (IOException e) {
log.error("Unable to write message to broker.", e);
}
}
private void subscribe(Map<Exchange, ExchangeType> exchanges){
try {
String queueName = subscriptionChannel.queueDeclare().getQueue();
exchanges.forEach((k,v) -> {
try {
subscriptionChannel.exchangeDeclare(k.toString(), v.toString());
subscriptionChannel.queueBind(queueName, k.toString(), "");
} catch (Exception e) {
log.error("Error declaring exchanges for exchange: " + k.toString(), e);
}
});
subscriptionChannel.basicConsume(queueName, true, consumer);
}
catch(Exception e){
log.error("Error declaring a queue for subscription channel", e);
}
}
private void configureConsumer(){
consumer = new DefaultConsumer(subscriptionChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
handleMessage(message);
}
};
}
客户端使用构建器模式来实例化代理连接,此时他们指定他们的发布交换和他们希望订阅的任意数量的交换。该系统中总共只有 19 个交易所。
正在正确发布和接收消息,但我收到报告说代理正在使服务器停滞不前。我会更密切地监视它,但我真的很想能够从状态调用中解释这些古怪的结果。我试过停止应用程序并重置然后重新配置代理,这使连接计数回到 0,但是一旦模块开始重新连接,数字就会开始恢复。
感谢您抽出宝贵时间阅读本文。任何建议将不胜感激!
你报的connection_readers
、connection_writers
、connection_channels
等在memory
段,单位是字节,不是连接数。这与您报告的管理 UI 部分完全不同。
要通过 CLI 获取有关连接数的数据,请使用 rabbitmqctl list_connections
命令。
channels
Number of channels using the connection.
另请参阅 list_exchanges
、list_queues
和 list_consumers
。