使用 receiveMessages 处理批处理的正确方法
Correct way to process batches using receiveMessages
我们正在使用 @azure/service-bus 程序包来处理来自多个主题的消息批次。
我们用来每 2 秒从主题中获取 20 条消息的代码如下所示。
let isProcessing: boolean = false;
setInterval(async () => {
if (isProcessing === false) {
isProcessing = true;
try {
const messages: Array<ServiceBusMessage>
= await receiver.receiveMessages(Configuration.SB.batchSize as number);
if (messages.length > 0) {
this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);
await Promise.all([
...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
]).catch(error => {
this.logger.error(error.message, error);
});
}
isProcessing = false;
} catch (error) {
this.logger.error(error.message, error);
isProcessing = false;
}
}
}, Configuration.SB.tickInterval as number);
我的问题是 - 这是最好的方法吗?有更好的方法吗? 它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,我正在尝试锻炼它的实现方式
感谢您的帮助
It works and is fairly performant BUT I think we are losing receiveAndDelete messages sometimes and I am trying to workout if its our implementation
有两种接收消息的模式
ReceiveAndDelete
不安全
- 安全
PeekLock
当使用ReceiveAndDelete
模式时,客户端收到消息后,会自动从服务器上删除。所以这是 最多一次 交付。
对于 PeekLock
,一条消息 "leased" 到客户端最多 5 分钟,客户端必须通过请求消息完成或 cancelling/dead-lettering 来确认成功处理,如果它无法处理它。如果这些操作中的 none 发生在定义的租用时间内(不必严格为 5 分钟,可以更短),将重试消息直到达到最大投递尝试次数 (MaxDeliveryCount
) 超出并且消息是死信的。请注意,消息永远不会丢失。即使它无法处理并且是死信。因此,这是 at-least-once-delivery 这可能更适合您的场景。它会对您编写客户端代码的方式产生轻微影响,但不会产生重大变化。
我们正在使用 @azure/service-bus 程序包来处理来自多个主题的消息批次。
我们用来每 2 秒从主题中获取 20 条消息的代码如下所示。
let isProcessing: boolean = false;
setInterval(async () => {
if (isProcessing === false) {
isProcessing = true;
try {
const messages: Array<ServiceBusMessage>
= await receiver.receiveMessages(Configuration.SB.batchSize as number);
if (messages.length > 0) {
this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);
await Promise.all([
...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
]).catch(error => {
this.logger.error(error.message, error);
});
}
isProcessing = false;
} catch (error) {
this.logger.error(error.message, error);
isProcessing = false;
}
}
}, Configuration.SB.tickInterval as number);
我的问题是 - 这是最好的方法吗?有更好的方法吗? 它可以工作并且性能相当好,但我认为我们有时会丢失 receiveAndDelete 消息,我正在尝试锻炼它的实现方式
感谢您的帮助
It works and is fairly performant BUT I think we are losing receiveAndDelete messages sometimes and I am trying to workout if its our implementation
有两种接收消息的模式
ReceiveAndDelete
不安全
- 安全
PeekLock
当使用ReceiveAndDelete
模式时,客户端收到消息后,会自动从服务器上删除。所以这是 最多一次 交付。
对于 PeekLock
,一条消息 "leased" 到客户端最多 5 分钟,客户端必须通过请求消息完成或 cancelling/dead-lettering 来确认成功处理,如果它无法处理它。如果这些操作中的 none 发生在定义的租用时间内(不必严格为 5 分钟,可以更短),将重试消息直到达到最大投递尝试次数 (MaxDeliveryCount
) 超出并且消息是死信的。请注意,消息永远不会丢失。即使它无法处理并且是死信。因此,这是 at-least-once-delivery 这可能更适合您的场景。它会对您编写客户端代码的方式产生轻微影响,但不会产生重大变化。