如何使用 IQueryable 和 IAsyncCollector 锁定 Azure Functions 中的 Azure Table 分区?

How can I Lock an Azure Table partition in an Azure Function using IQueryable and IAsyncCollector?

我正在摆弄 Azure Functions,将其与 CQRS 和事件源相结合。我正在使用 Azure Table 存储作为事件存储。下面的代码是一个简化版本,不会分散对问题的注意力。

我对任何代码提示都不感兴趣,因为这不是代码的最终版本。

public static async Task Run(BrokeredMessage commandBrokeredMessage, IQueryable<DomainEvent> eventsQueryable, IAsyncCollector<IDomainEvent> eventsCollector, TraceWriter log)
{
    var command = commandBrokeredMessage.GetBody<FooCommand>();
    var committedEvents = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId);
    var expectedVersion = committedEvents .Max(e => e.Version);

    // some domain logic that will result in domain events
    var uncommittedEvents = HandleFooCommand(command, committedEvents);

    // using(Some way to lock partition)
    // {
    var currentVersion = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId).Max(e => e.Version);

    if(expectedVersion != currentVersion)
    {
        throw new ConcurrencyException("expected version is not the same as current version");
    }

    var i = currentVersion;
    foreach (var domainEvent in uncommittedEvents.OrderBy(e => e.Timestamp))
    {
        i++;
        domainEvent.Version = i;
        await eventsCollector.AddAsync(domainEvent);
    }
    // }
}

public class DomainEvent : TableEntity
{
    private string eventType;

    public virtual string EventType
    {
        get { return eventType ?? (eventType = GetType().UnderlyingSystemType.Name); }
        set { eventType = value; }
    }

    public long Version { get; set; }
}

我的努力

公平地说,我无法尝试任何事情,因为我不知道从哪里开始,甚至不知道这是否可能。我做了一些研究并没有解决我的问题,但可以帮助你解决这个问题。

Azure Table 支持锁定吗? 是的,他们这样做:Managing Concurrency in Microsoft Azure Storage。这叫做租赁,但我不知道如何在 Azure 函数中实现它。

其他来源

提示、建议、备选方案

我总是乐于接受任何关于如何解决问题的建议,但我不能接受这些作为我问题的答案。除非我的问题的答案是 "no",否则我无法将备选方案标记为答案。我不是在寻找解决问题的最佳方法,我希望它按照我设计的方式工作。我知道这很固执,但这是 practice/fiddling.

Blob 租约确实可以很好地满足您要完成的任务(Functions 运行时实际上在内部广泛使用了它)。

如果在处理分区之前,您获得了 blob 的租约(按照惯例,以分区命名的 blob,或类似的名称),您将能够确保只有给定的函数在处理那个分区。

您链接到的文章确实显示了租约获取和释放的示例,您可以在 documentation.

中找到更多信息

您要确保的一件事是在离开锁定范围之前刷新您的收集器(通过对其调用FlushAsync

希望对您有所帮助!