QueueClient 中长时间不活动和频繁出现 MessageLockLostException 的时期

Periods of prolonged inactivity and frequent MessageLockLostException in QueueClient

背景

我们有一个以 Azure 服务总线作为消息代理的数据传输解决方案。我们正在通过 x 队列从 x 数据集传输数据 - x 专用 QueueClient 作为发件人。一些发件人以每两秒一条消息的速度发布消息,而另一些发件人每 15 分钟发布一条消息。

数据源端(发件人所在的位置)上的应用程序工作正常,为我们提供了所需的吞吐量。

另一方面,我们有一个应用程序,每个队列有一个 QueueClient 接收器,配置如下:

注册到每个接收器的 MessageHandler 执行以下操作:

public CompletableFuture<Void> onMessageAsync(final IMessage message) {

    // deserialize the message body
    final CustomObject customObject = (CustomObject)SerializationUtils.deserialize((byte[])message.getMessageBody().getBinaryData().get(0));

    // process processDB1() and processDB2() asynchronously
    final List<CompletableFuture<Boolean>> processFutures = new ArrayList<CompletableFuture<Boolean>>();

    processFutures.add(processDB1(customObject));  // processDB1() returns Boolean
    processFutures.add(processDB2(customObject)); // processDB2() returns Boolean

    // join both the completablefutures to get the result Booleans
    List<Boolean> results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -> processFutures.stream()
        .map(CompletableFuture<Boolean>::join).collect(Collectors.toList())

    if (results.contains(false)) {
        // dead-letter the message if results contains false
        return getQueueClient().deadLetterAsync(message.getLockToken());
    } else {
        // complete the message otherwise
        getQueueClient().completeAsync(message.getLockToken());
    }
}

我们测试了以下场景:

场景 1 - 接收模式 = RECEIVEANDDELETE,消息发布率:30/ minute

预期行为

应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。

实际行为

我们观察到来自 QueueClient 的随机、长时间不活动 - 从几分钟到几小时不等 - 没有来自服务总线命名空间的传出消息(在指标图表上观察到)并且没有相同时间段的消费记录!

场景 2 - 接收模式 = PEEKLOCK,消息发布率:30/ minute

预期行为

应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。

实际行为

进入应用程序 运行 20-30 分钟后,我们不断看到 MessageLockLostException

我们尝试执行以下操作 -

  1. 我们将预取计数(从 20 * 处理速率 - 如最佳实践指南中所述)减少到最低限度(在一个测试周期内甚至 0),以减少编号。为客户端锁定的消息数
  2. maxAutoRenewDuration 增加到 5 分钟 - 我们的 processDB1()processDB2() 在几乎 90% 的情况下不会超过一两秒 - 所以,我认为30 秒的锁定持续时间和 maxAutoRenewDuration 在这里不是问题。
  3. 删除了阻塞CompletableFuture.get()并使处理同步。

None 这些调整帮助我们解决了这个问题。我们观察到 COMPLETERENEWMESSAGELOCK 正在抛出 MessageLockLostException.

我们需要帮助寻找以下问题的答案:

  1. 为什么场景 1 中的 QueueClient 会长时间不活动?
  2. 我们怎么知道 MessageLockLostException 被抛出,因为锁确实已经过期?我们怀疑锁不会过早过期,因为我们的处理会在一两秒内发生。禁用预取也没有为我们解决这个问题。

版本和服务总线详细信息

对于场景 1:

如果您启用了 duplicate detection history,则可能会按照以下解释的情况发生此行为:

我启用了 30 秒。我经常使用重复的消息访问服务总线(我的案例消息与来自客户端的相同消息 ID - 30 / 分钟)。我会看到 window 没有 activity 离职。尽管消息是在服务总线上从发送客户端接收到的,但我无法在传出消息中看到它们。您可能会检查是否再次遇到被过滤的重复消息 - 反过来导致 activity 传出。

另请注意:创建队列后,您无法enable/disable重复检测。您只能在创建队列时这样做。

问题不在于 QueueClient 对象本身。这是我们从 MessageHandler 中触发的进程:processDB1(customObject)processDB2(customObject)。由于这些进程未优化,消息消耗下降并且锁 gor 过期(在 peek-lock 模式下),因为处理程序花费更多时间(相对于消息发布到队列的速率)完成这些操作。

流程优化后,消耗和完成度(peek-lock模式下)都很好。