QueueClient 中长时间不活动和频繁出现 MessageLockLostException 的时期
Periods of prolonged inactivity and frequent MessageLockLostException in QueueClient
背景
我们有一个以 Azure 服务总线作为消息代理的数据传输解决方案。我们正在通过 x
队列从 x
数据集传输数据 - x
专用 QueueClient
作为发件人。一些发件人以每两秒一条消息的速度发布消息,而另一些发件人每 15 分钟发布一条消息。
数据源端(发件人所在的位置)上的应用程序工作正常,为我们提供了所需的吞吐量。
另一方面,我们有一个应用程序,每个队列有一个 QueueClient 接收器,配置如下:
maxConcurrentCalls
= 1
autoComplete
= true
(如果接收模式 = RECEIVEANDDELETE
)和 false
(如果接收模式 = PEEKLOCK
) - 我们有一些接收器,如果它们意外关闭,将希望保留服务总线队列中的消息。
maxAutoRenewDuration
= 3
分钟(所有队列的锁定持续时间 = 30
秒)
- 单线程执行器服务
注册到每个接收器的 MessageHandler
执行以下操作:
public CompletableFuture<Void> onMessageAsync(final IMessage message) {
// deserialize the message body
final CustomObject customObject = (CustomObject)SerializationUtils.deserialize((byte[])message.getMessageBody().getBinaryData().get(0));
// process processDB1() and processDB2() asynchronously
final List<CompletableFuture<Boolean>> processFutures = new ArrayList<CompletableFuture<Boolean>>();
processFutures.add(processDB1(customObject)); // processDB1() returns Boolean
processFutures.add(processDB2(customObject)); // processDB2() returns Boolean
// join both the completablefutures to get the result Booleans
List<Boolean> results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -> processFutures.stream()
.map(CompletableFuture<Boolean>::join).collect(Collectors.toList())
if (results.contains(false)) {
// dead-letter the message if results contains false
return getQueueClient().deadLetterAsync(message.getLockToken());
} else {
// complete the message otherwise
getQueueClient().completeAsync(message.getLockToken());
}
}
我们测试了以下场景:
场景 1 - 接收模式 = RECEIVEANDDELETE
,消息发布率:30/ minute
预期行为
应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。
实际行为
我们观察到来自 QueueClient 的随机、长时间不活动 - 从几分钟到几小时不等 - 没有来自服务总线命名空间的传出消息(在指标图表上观察到)并且没有相同时间段的消费记录!
场景 2 - 接收模式 = PEEKLOCK
,消息发布率:30/ minute
预期行为
应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。
实际行为
进入应用程序 运行 20-30 分钟后,我们不断看到 MessageLockLostException
。
我们尝试执行以下操作 -
- 我们将预取计数(从 20 * 处理速率 - 如最佳实践指南中所述)减少到最低限度(在一个测试周期内甚至
0
),以减少编号。为客户端锁定的消息数
- 将
maxAutoRenewDuration
增加到 5 分钟 - 我们的 processDB1()
和 processDB2()
在几乎 90% 的情况下不会超过一两秒 - 所以,我认为30 秒的锁定持续时间和 maxAutoRenewDuration
在这里不是问题。
- 删除了阻塞
CompletableFuture.get()
并使处理同步。
None 这些调整帮助我们解决了这个问题。我们观察到 COMPLETE
或 RENEWMESSAGELOCK
正在抛出 MessageLockLostException
.
我们需要帮助寻找以下问题的答案:
- 为什么场景 1 中的
QueueClient
会长时间不活动?
- 我们怎么知道
MessageLockLostException
被抛出,因为锁确实已经过期?我们怀疑锁不会过早过期,因为我们的处理会在一两秒内发生。禁用预取也没有为我们解决这个问题。
版本和服务总线详细信息
- Java -
openjdk-11-jre
- Azure 服务总线命名空间层:
Standard
- Java SDK 版本 -
3.4.0
对于场景 1:
如果您启用了 duplicate detection history,则可能会按照以下解释的情况发生此行为:
我启用了 30 秒。我经常使用重复的消息访问服务总线(我的案例消息与来自客户端的相同消息 ID - 30 / 分钟)。我会看到 window 没有 activity 离职。尽管消息是在服务总线上从发送客户端接收到的,但我无法在传出消息中看到它们。您可能会检查是否再次遇到被过滤的重复消息 - 反过来导致 activity 传出。
另请注意:创建队列后,您无法enable/disable重复检测。您只能在创建队列时这样做。
问题不在于 QueueClient
对象本身。这是我们从 MessageHandler
中触发的进程:processDB1(customObject)
和 processDB2(customObject)
。由于这些进程未优化,消息消耗下降并且锁 gor 过期(在 peek-lock 模式下),因为处理程序花费更多时间(相对于消息发布到队列的速率)完成这些操作。
流程优化后,消耗和完成度(peek-lock模式下)都很好。
背景
我们有一个以 Azure 服务总线作为消息代理的数据传输解决方案。我们正在通过 x
队列从 x
数据集传输数据 - x
专用 QueueClient
作为发件人。一些发件人以每两秒一条消息的速度发布消息,而另一些发件人每 15 分钟发布一条消息。
数据源端(发件人所在的位置)上的应用程序工作正常,为我们提供了所需的吞吐量。
另一方面,我们有一个应用程序,每个队列有一个 QueueClient 接收器,配置如下:
maxConcurrentCalls
=1
autoComplete
=true
(如果接收模式 =RECEIVEANDDELETE
)和false
(如果接收模式 =PEEKLOCK
) - 我们有一些接收器,如果它们意外关闭,将希望保留服务总线队列中的消息。maxAutoRenewDuration
=3
分钟(所有队列的锁定持续时间 =30
秒)- 单线程执行器服务
注册到每个接收器的 MessageHandler
执行以下操作:
public CompletableFuture<Void> onMessageAsync(final IMessage message) {
// deserialize the message body
final CustomObject customObject = (CustomObject)SerializationUtils.deserialize((byte[])message.getMessageBody().getBinaryData().get(0));
// process processDB1() and processDB2() asynchronously
final List<CompletableFuture<Boolean>> processFutures = new ArrayList<CompletableFuture<Boolean>>();
processFutures.add(processDB1(customObject)); // processDB1() returns Boolean
processFutures.add(processDB2(customObject)); // processDB2() returns Boolean
// join both the completablefutures to get the result Booleans
List<Boolean> results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -> processFutures.stream()
.map(CompletableFuture<Boolean>::join).collect(Collectors.toList())
if (results.contains(false)) {
// dead-letter the message if results contains false
return getQueueClient().deadLetterAsync(message.getLockToken());
} else {
// complete the message otherwise
getQueueClient().completeAsync(message.getLockToken());
}
}
我们测试了以下场景:
场景 1 - 接收模式 = RECEIVEANDDELETE
,消息发布率:30/ minute
预期行为
应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。
实际行为
我们观察到来自 QueueClient 的随机、长时间不活动 - 从几分钟到几小时不等 - 没有来自服务总线命名空间的传出消息(在指标图表上观察到)并且没有相同时间段的消费记录!
场景 2 - 接收模式 = PEEKLOCK
,消息发布率:30/ minute
预期行为
应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。
实际行为
进入应用程序 运行 20-30 分钟后,我们不断看到 MessageLockLostException
。
我们尝试执行以下操作 -
- 我们将预取计数(从 20 * 处理速率 - 如最佳实践指南中所述)减少到最低限度(在一个测试周期内甚至
0
),以减少编号。为客户端锁定的消息数 - 将
maxAutoRenewDuration
增加到 5 分钟 - 我们的processDB1()
和processDB2()
在几乎 90% 的情况下不会超过一两秒 - 所以,我认为30 秒的锁定持续时间和maxAutoRenewDuration
在这里不是问题。 - 删除了阻塞
CompletableFuture.get()
并使处理同步。
None 这些调整帮助我们解决了这个问题。我们观察到 COMPLETE
或 RENEWMESSAGELOCK
正在抛出 MessageLockLostException
.
我们需要帮助寻找以下问题的答案:
- 为什么场景 1 中的
QueueClient
会长时间不活动? - 我们怎么知道
MessageLockLostException
被抛出,因为锁确实已经过期?我们怀疑锁不会过早过期,因为我们的处理会在一两秒内发生。禁用预取也没有为我们解决这个问题。
版本和服务总线详细信息
- Java -
openjdk-11-jre
- Azure 服务总线命名空间层:
Standard
- Java SDK 版本 -
3.4.0
对于场景 1:
如果您启用了 duplicate detection history,则可能会按照以下解释的情况发生此行为:
我启用了 30 秒。我经常使用重复的消息访问服务总线(我的案例消息与来自客户端的相同消息 ID - 30 / 分钟)。我会看到 window 没有 activity 离职。尽管消息是在服务总线上从发送客户端接收到的,但我无法在传出消息中看到它们。您可能会检查是否再次遇到被过滤的重复消息 - 反过来导致 activity 传出。
另请注意:创建队列后,您无法enable/disable重复检测。您只能在创建队列时这样做。
问题不在于 QueueClient
对象本身。这是我们从 MessageHandler
中触发的进程:processDB1(customObject)
和 processDB2(customObject)
。由于这些进程未优化,消息消耗下降并且锁 gor 过期(在 peek-lock 模式下),因为处理程序花费更多时间(相对于消息发布到队列的速率)完成这些操作。
流程优化后,消耗和完成度(peek-lock模式下)都很好。