ActiveMQ + 网络管理系统。未遵守预取限制
ActiveMQ + NMS. Prefetch limit not respected
我有一个同步消费者通过这个 URL 连接到队列,它还设置了预取限制:
tcp://localhost:61616?nms.prefetchPolicy.all=5
我以编程方式检查限制并将其输出到我的日志中,因此我知道语法是正确的并且正在设置它。
我对预取的理解是,一个同步消费者将被发送 x 条消息,然后一旦它确认了其中的 50%,它就会被发送另一批。这不是我看到的行为。
- 我在 URL
中将预取设置为 5
- 我已经用 20 条消息预先填充了一个队列
- 我记录收到的每条消息
- 我在我的代码中注释掉了
message.Acknowledge();
行
所以我认为我应该在我的日志中看到 5 条消息,而不是更多,因为我不承认其中任何一条。
但是,我看到所有 20 条消息都出现在日志中。
我是否误解了预取的工作方式,或者我的代码中是否存在错误?
代码(c#、.Net Core):
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
connection.Start();
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
log.InfoFormat("Prefetch set to:{0}. Maximum configurable:{1}", ((Connection)connection).PrefetchPolicy.QueuePrefetch, PrefetchPolicy.MAX_PREFETCH_SIZE);
while (true)
{
var message = consumer.Receive(TimeSpan.FromSeconds(5));
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
//message.Acknowledge();
}
日志:
[18:07:37 INF] Prefetch set to:5. Maximum configurable:32766
[18:07:38 DBG] Message 0 Received. Attempt:2
[18:07:38 DBG] Message 1 Received. Attempt:2
[18:07:38 DBG] Message 2 Received. Attempt:2
[18:07:38 DBG] Message 3 Received. Attempt:2
[18:07:40 DBG] Message 4 Received. Attempt:2
[18:07:40 DBG] Message 5 Received. Attempt:2
[18:07:40 DBG] Message 6 Received. Attempt:2
[18:07:40 DBG] Message 7 Received. Attempt:2
[18:07:40 DBG] Message 8 Received. Attempt:2
[18:07:40 DBG] Message 9 Received. Attempt:2
[18:07:40 DBG] Message 10 Received. Attempt:2
[18:07:40 DBG] Message 11 Received. Attempt:2
[18:07:40 DBG] Message 12 Received. Attempt:2
[18:07:40 DBG] Message 13 Received. Attempt:2
[18:07:40 DBG] Message 14 Received. Attempt:2
[18:07:40 DBG] Message 15 Received. Attempt:2
[18:07:41 DBG] Message 16 Received. Attempt:2
[18:07:41 DBG] Message 17 Received. Attempt:2
[18:07:41 DBG] Message 18 Received. Attempt:2
[18:07:41 DBG] Message 19 Received. Attempt:2
提前致谢:)
P.S。我正在使用:
- ActiveMQ v5.15.12
- Apache.NMS.ActiveMQ.NetCore v1.7.2 (NuGet)
预取值并不意味着您可以控制队列分派,它只是控制在您的应用程序实际上没有进行任何接收处理或消耗缓慢的情况下有多少缓冲等待交付给您。这允许代理向客户端提供一些消息,然后继续向在第一个处理其预取时到达的其他客户端提供消息。
一旦您开始使用消息,无论您是否确认它们,客户端都会延长预取信用以允许始终存在预取数量大小的积压。这是因为作为消费者,您可能希望批量读取许多消息并一次确认所有消息,这是客户端确认和交易消费所做的。如果我没记错的话,NMS 客户端会以 70% 的速度充值预取信用,而不是 50%,但无论如何,只要您从预取缓冲区中提取了该数量的消息,就会发生这种情况。
如果你想控制从代理发送的消息,那么你需要使用零预取设置和同步接收消费者来确保在任何给定时间只有一条消息可以传送到客户端。
我有一个同步消费者通过这个 URL 连接到队列,它还设置了预取限制:
tcp://localhost:61616?nms.prefetchPolicy.all=5
我以编程方式检查限制并将其输出到我的日志中,因此我知道语法是正确的并且正在设置它。
我对预取的理解是,一个同步消费者将被发送 x 条消息,然后一旦它确认了其中的 50%,它就会被发送另一批。这不是我看到的行为。
- 我在 URL 中将预取设置为 5
- 我已经用 20 条消息预先填充了一个队列
- 我记录收到的每条消息
- 我在我的代码中注释掉了
message.Acknowledge();
行
所以我认为我应该在我的日志中看到 5 条消息,而不是更多,因为我不承认其中任何一条。 但是,我看到所有 20 条消息都出现在日志中。
我是否误解了预取的工作方式,或者我的代码中是否存在错误?
代码(c#、.Net Core):
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
connection.Start();
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
log.InfoFormat("Prefetch set to:{0}. Maximum configurable:{1}", ((Connection)connection).PrefetchPolicy.QueuePrefetch, PrefetchPolicy.MAX_PREFETCH_SIZE);
while (true)
{
var message = consumer.Receive(TimeSpan.FromSeconds(5));
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
//message.Acknowledge();
}
日志:
[18:07:37 INF] Prefetch set to:5. Maximum configurable:32766
[18:07:38 DBG] Message 0 Received. Attempt:2
[18:07:38 DBG] Message 1 Received. Attempt:2
[18:07:38 DBG] Message 2 Received. Attempt:2
[18:07:38 DBG] Message 3 Received. Attempt:2
[18:07:40 DBG] Message 4 Received. Attempt:2
[18:07:40 DBG] Message 5 Received. Attempt:2
[18:07:40 DBG] Message 6 Received. Attempt:2
[18:07:40 DBG] Message 7 Received. Attempt:2
[18:07:40 DBG] Message 8 Received. Attempt:2
[18:07:40 DBG] Message 9 Received. Attempt:2
[18:07:40 DBG] Message 10 Received. Attempt:2
[18:07:40 DBG] Message 11 Received. Attempt:2
[18:07:40 DBG] Message 12 Received. Attempt:2
[18:07:40 DBG] Message 13 Received. Attempt:2
[18:07:40 DBG] Message 14 Received. Attempt:2
[18:07:40 DBG] Message 15 Received. Attempt:2
[18:07:41 DBG] Message 16 Received. Attempt:2
[18:07:41 DBG] Message 17 Received. Attempt:2
[18:07:41 DBG] Message 18 Received. Attempt:2
[18:07:41 DBG] Message 19 Received. Attempt:2
提前致谢:)
P.S。我正在使用:
- ActiveMQ v5.15.12
- Apache.NMS.ActiveMQ.NetCore v1.7.2 (NuGet)
预取值并不意味着您可以控制队列分派,它只是控制在您的应用程序实际上没有进行任何接收处理或消耗缓慢的情况下有多少缓冲等待交付给您。这允许代理向客户端提供一些消息,然后继续向在第一个处理其预取时到达的其他客户端提供消息。
一旦您开始使用消息,无论您是否确认它们,客户端都会延长预取信用以允许始终存在预取数量大小的积压。这是因为作为消费者,您可能希望批量读取许多消息并一次确认所有消息,这是客户端确认和交易消费所做的。如果我没记错的话,NMS 客户端会以 70% 的速度充值预取信用,而不是 50%,但无论如何,只要您从预取缓冲区中提取了该数量的消息,就会发生这种情况。
如果你想控制从代理发送的消息,那么你需要使用零预取设置和同步接收消费者来确保在任何给定时间只有一条消息可以传送到客户端。