处理服务总线 Message.Complete() 异常
Handling service bus Message.Complete() exceptions
考虑这样的场景,一个启用了消息重复数据删除的 Azure 服务总线,有一个主题,一个订阅和一个订阅该队列的应用程序。
如何确保应用程序从队列中接收消息一次且仅一次?
这是我在我的应用程序中用来接收消息的代码:
public abstract class ServiceBusListener<T> : IServiceBusListener
{
private SubscriptionClient subscriptionClient;
// ..... snip
private void ReceiveMessages()
{
message = this.subscriptionClient.Receive(TimeSpan.FromSeconds(5));
if (message != null)
{
T payload = message.GetBody<T>(message);
try
{
DoWork(payload);
message.Complete();
}
catch (Exception exception)
{
// message.Complete failed
}
}
}
}
我预见到的问题是,如果 message.Complete()
由于某种原因失败,那么刚刚处理过的消息将保留在 Azure 中的订阅队列中。当再次调用 ReceiveMessages()
时,它将从队列中选取相同的消息,应用程序将再次执行相同的工作。
虽然最好的解决方案是具有幂等域逻辑 (DoWork(payload)
),但在这种情况下很难编写。
我能看到确保一次且仅一次交付到应用程序的唯一方法是构建另一个队列作为 Azure 服务总线和应用程序之间的中介。我相信这叫做 'Durable client-side queue'.
但是我可以看出,对于许多使用 Azure 服务总线的应用程序来说,这将是一个潜在的问题,所以持久的客户端队列是唯一的解决方案吗?
当您使消息出列时的默认行为称为 "Peek-Lock" 它会锁定该消息,以便在您处理它时其他人无法获取它,并在您提交时将其删除。如果您提交失败,它将解锁,因此可以再次拾取它。这可能是你正在经历的。 您可以更改使用 "Receive and Delete" 的行为,这会在您收到要处理的文件后立即将其从队列中删除。
https://msdn.microsoft.com/en-us/library/azure/hh780770.aspx
如果您在消息处理中包含检测消息是否已成功处理或已达到的阶段的逻辑,则可以继续使用单个订阅。
例如,我使用服务总线消息将来自外部支付系统的支付插入到 CRM 系统中。在插入之前,消息处理逻辑首先检查付款是否已存在于 CRM 中(使用与付款关联的唯一 ID)。这是必需的,因为偶尔付款会成功添加到 CRM,但不会报告回来(超时或连接)。在提取消息时使用 Receive/Delete 意味着付款可能会丢失,而不检查付款是否已经存在可能会导致重复付款。
如果这不可能,那么我应用的另一个解决方案是更新 table 存储以记录处理消息的进度。接收消息时,会检查 table 以查看是否已完成任何阶段。这允许消息从之前到达的阶段继续。
您描述的情况最可能的原因是 DoWork 花费的时间超过了消息的锁定时间。可以将消息锁定超时调整为安全地超过预期的 DoWork 周期的值。
如果您能够跟踪处理消息锁到期所花费的时间,也可以在处理程序中对消息调用 RenewLock。
也许我误解了第二个队列的设计原则,但它似乎同样容易受到您概述的原始场景的影响。
在不知道您的 DoWork() 涉及什么的情况下很难给出明确的答案,但我认为上述一种或多种组合是更好的解决方案。
我在我负责的一个大型 Azure 平台中遇到了类似的挑战。我使用补偿事务模式 (https://msdn.microsoft.com/en-us/library/dn589804.aspx), and Event sourcing Pattern (https://msdn.microsoft.com/en-us/library/dn589792.aspx) 体现的概念的逻辑组合。具体如何结合这些概念会有所不同,但最终,您可能需要根据自己的 "rollback" 逻辑进行规划,或者检测到先前的进程已 100% 成功完成减去消息的删除。如果有什么可以预先检查的,您会知道消息根本没有被删除,然后完成它并继续。 "check" 有多贵可能使这成为一个坏主意。您甚至可以 "create" 人为的最后一步,例如向数据库添加一行,仅当 DoWork 到达末尾时才 运行 。然后,您可以在处理任何其他消息之前检查该行。
IMO,最好的方法是确保 DoWork() 中的所有步骤都检查是否存在已经执行的工作(如果可能)。例如,如果它正在创建 DB table、运行 和 "IF NOT EXISTS(SELECT TABLE_NAME FROM INFORMATION_SCHEMA..."。在那种情况下,即使发生这种情况的可能性很小,再次处理消息也是安全的。
我使用的其他方法是存储先前 X 条消息(即 10,000 条)的 MessageID(每条消息上的顺序 bigint),然后在继续处理消息之前检查它们是否存在(NOT IN)。没有你想象的那么贵而且非常安全。如果找到,只需 Complete() 消息并继续。在其他情况下,我使用 "starting" 类型状态更新消息(在某些队列类型中内联,在其他队列类型中持久化),然后继续。如果您阅读了一条消息并且该消息已设置为 "started",您就会知道某些事情要么失败了,要么没有正确清除。
抱歉,这不是一个明确的答案,但有很多注意事项。
最诚挚的问候...
考虑这样的场景,一个启用了消息重复数据删除的 Azure 服务总线,有一个主题,一个订阅和一个订阅该队列的应用程序。
如何确保应用程序从队列中接收消息一次且仅一次?
这是我在我的应用程序中用来接收消息的代码:
public abstract class ServiceBusListener<T> : IServiceBusListener
{
private SubscriptionClient subscriptionClient;
// ..... snip
private void ReceiveMessages()
{
message = this.subscriptionClient.Receive(TimeSpan.FromSeconds(5));
if (message != null)
{
T payload = message.GetBody<T>(message);
try
{
DoWork(payload);
message.Complete();
}
catch (Exception exception)
{
// message.Complete failed
}
}
}
}
我预见到的问题是,如果 message.Complete()
由于某种原因失败,那么刚刚处理过的消息将保留在 Azure 中的订阅队列中。当再次调用 ReceiveMessages()
时,它将从队列中选取相同的消息,应用程序将再次执行相同的工作。
虽然最好的解决方案是具有幂等域逻辑 (DoWork(payload)
),但在这种情况下很难编写。
我能看到确保一次且仅一次交付到应用程序的唯一方法是构建另一个队列作为 Azure 服务总线和应用程序之间的中介。我相信这叫做 'Durable client-side queue'.
但是我可以看出,对于许多使用 Azure 服务总线的应用程序来说,这将是一个潜在的问题,所以持久的客户端队列是唯一的解决方案吗?
当您使消息出列时的默认行为称为 "Peek-Lock" 它会锁定该消息,以便在您处理它时其他人无法获取它,并在您提交时将其删除。如果您提交失败,它将解锁,因此可以再次拾取它。这可能是你正在经历的。 您可以更改使用 "Receive and Delete" 的行为,这会在您收到要处理的文件后立即将其从队列中删除。 https://msdn.microsoft.com/en-us/library/azure/hh780770.aspx
如果您在消息处理中包含检测消息是否已成功处理或已达到的阶段的逻辑,则可以继续使用单个订阅。
例如,我使用服务总线消息将来自外部支付系统的支付插入到 CRM 系统中。在插入之前,消息处理逻辑首先检查付款是否已存在于 CRM 中(使用与付款关联的唯一 ID)。这是必需的,因为偶尔付款会成功添加到 CRM,但不会报告回来(超时或连接)。在提取消息时使用 Receive/Delete 意味着付款可能会丢失,而不检查付款是否已经存在可能会导致重复付款。
如果这不可能,那么我应用的另一个解决方案是更新 table 存储以记录处理消息的进度。接收消息时,会检查 table 以查看是否已完成任何阶段。这允许消息从之前到达的阶段继续。
您描述的情况最可能的原因是 DoWork 花费的时间超过了消息的锁定时间。可以将消息锁定超时调整为安全地超过预期的 DoWork 周期的值。 如果您能够跟踪处理消息锁到期所花费的时间,也可以在处理程序中对消息调用 RenewLock。
也许我误解了第二个队列的设计原则,但它似乎同样容易受到您概述的原始场景的影响。
在不知道您的 DoWork() 涉及什么的情况下很难给出明确的答案,但我认为上述一种或多种组合是更好的解决方案。
我在我负责的一个大型 Azure 平台中遇到了类似的挑战。我使用补偿事务模式 (https://msdn.microsoft.com/en-us/library/dn589804.aspx), and Event sourcing Pattern (https://msdn.microsoft.com/en-us/library/dn589792.aspx) 体现的概念的逻辑组合。具体如何结合这些概念会有所不同,但最终,您可能需要根据自己的 "rollback" 逻辑进行规划,或者检测到先前的进程已 100% 成功完成减去消息的删除。如果有什么可以预先检查的,您会知道消息根本没有被删除,然后完成它并继续。 "check" 有多贵可能使这成为一个坏主意。您甚至可以 "create" 人为的最后一步,例如向数据库添加一行,仅当 DoWork 到达末尾时才 运行 。然后,您可以在处理任何其他消息之前检查该行。
IMO,最好的方法是确保 DoWork() 中的所有步骤都检查是否存在已经执行的工作(如果可能)。例如,如果它正在创建 DB table、运行 和 "IF NOT EXISTS(SELECT TABLE_NAME FROM INFORMATION_SCHEMA..."。在那种情况下,即使发生这种情况的可能性很小,再次处理消息也是安全的。
我使用的其他方法是存储先前 X 条消息(即 10,000 条)的 MessageID(每条消息上的顺序 bigint),然后在继续处理消息之前检查它们是否存在(NOT IN)。没有你想象的那么贵而且非常安全。如果找到,只需 Complete() 消息并继续。在其他情况下,我使用 "starting" 类型状态更新消息(在某些队列类型中内联,在其他队列类型中持久化),然后继续。如果您阅读了一条消息并且该消息已设置为 "started",您就会知道某些事情要么失败了,要么没有正确清除。
抱歉,这不是一个明确的答案,但有很多注意事项。
最诚挚的问候...