使用 receiveMessages 处理批处理的正确方法

Correct way to process batches using receiveMessages

我们正在使用 @azure/service-bus 程序包来处理来自多个主题的消息批次。

我们用来每 2 秒从主题中获取 20 条消息的代码如下所示。

       let isProcessing: boolean = false;

       setInterval(async () => {

        if (isProcessing === false) {

            isProcessing = true;

            try {

                const messages: Array<ServiceBusMessage>
                    = await receiver.receiveMessages(Configuration.SB.batchSize as number);

                if (messages.length > 0) {

                    this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);

                    await Promise.all([
                        ...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
                    ]).catch(error => {
                        this.logger.error(error.message, error);
                    });

                }

                isProcessing = false;

            } catch (error) {

                this.logger.error(error.message, error);

                isProcessing = false;

            }

        }

    }, Configuration.SB.tickInterval as number);

我的问题是 - 这是最好的方法吗?有更好的方法吗? 它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,我正在尝试锻炼它的实现方式

感谢您的帮助

It works and is fairly performant BUT I think we are losing receiveAndDelete messages sometimes and I am trying to workout if its our implementation

有两种接收消息的模式

  • ReceiveAndDelete
  • 不安全
  • 安全 PeekLock

当使用ReceiveAndDelete模式时,客户端收到消息后,会自动从服务器上删除。所以这是 最多一次 交付。

对于 PeekLock,一条消息 "leased" 到客户端最多 5 分钟,客户端必须通过请求消息完成或 cancelling/dead-lettering 来确认成功处理,如果它无法处理它。如果这些操作中的 none 发生在定义的租用时间内(不必严格为 5 分钟,可以更短),将重试消息直到达到最大投递尝试次数 (MaxDeliveryCount ) 超出并且消息是死信的。请注意,消息永远不会丢失。即使它无法处理并且是死信。因此,这是 at-least-once-delivery 这可能更适合您的场景。它会对您编写客户端代码的方式产生轻微影响,但不会产生重大变化。