如何使用 IQueryable 和 IAsyncCollector 锁定 Azure Functions 中的 Azure Table 分区?
How can I Lock an Azure Table partition in an Azure Function using IQueryable and IAsyncCollector?
我正在摆弄 Azure Functions,将其与 CQRS 和事件源相结合。我正在使用 Azure Table 存储作为事件存储。下面的代码是一个简化版本,不会分散对问题的注意力。
我对任何代码提示都不感兴趣,因为这不是代码的最终版本。
public static async Task Run(BrokeredMessage commandBrokeredMessage, IQueryable<DomainEvent> eventsQueryable, IAsyncCollector<IDomainEvent> eventsCollector, TraceWriter log)
{
var command = commandBrokeredMessage.GetBody<FooCommand>();
var committedEvents = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId);
var expectedVersion = committedEvents .Max(e => e.Version);
// some domain logic that will result in domain events
var uncommittedEvents = HandleFooCommand(command, committedEvents);
// using(Some way to lock partition)
// {
var currentVersion = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId).Max(e => e.Version);
if(expectedVersion != currentVersion)
{
throw new ConcurrencyException("expected version is not the same as current version");
}
var i = currentVersion;
foreach (var domainEvent in uncommittedEvents.OrderBy(e => e.Timestamp))
{
i++;
domainEvent.Version = i;
await eventsCollector.AddAsync(domainEvent);
}
// }
}
public class DomainEvent : TableEntity
{
private string eventType;
public virtual string EventType
{
get { return eventType ?? (eventType = GetType().UnderlyingSystemType.Name); }
set { eventType = value; }
}
public long Version { get; set; }
}
我的努力
公平地说,我无法尝试任何事情,因为我不知道从哪里开始,甚至不知道这是否可能。我做了一些研究并没有解决我的问题,但可以帮助你解决这个问题。
Azure Table 支持锁定吗?
是的,他们这样做:Managing Concurrency in Microsoft Azure Storage。这叫做租赁,但我不知道如何在 Azure 函数中实现它。
其他来源
提示、建议、备选方案
我总是乐于接受任何关于如何解决问题的建议,但我不能接受这些作为我问题的答案。除非我的问题的答案是 "no",否则我无法将备选方案标记为答案。我不是在寻找解决问题的最佳方法,我希望它按照我设计的方式工作。我知道这很固执,但这是 practice/fiddling.
Blob 租约确实可以很好地满足您要完成的任务(Functions 运行时实际上在内部广泛使用了它)。
如果在处理分区之前,您获得了 blob 的租约(按照惯例,以分区命名的 blob,或类似的名称),您将能够确保只有给定的函数在处理那个分区。
您链接到的文章确实显示了租约获取和释放的示例,您可以在 documentation.
中找到更多信息
您要确保的一件事是在离开锁定范围之前刷新您的收集器(通过对其调用FlushAsync
)
希望对您有所帮助!
我正在摆弄 Azure Functions,将其与 CQRS 和事件源相结合。我正在使用 Azure Table 存储作为事件存储。下面的代码是一个简化版本,不会分散对问题的注意力。
我对任何代码提示都不感兴趣,因为这不是代码的最终版本。
public static async Task Run(BrokeredMessage commandBrokeredMessage, IQueryable<DomainEvent> eventsQueryable, IAsyncCollector<IDomainEvent> eventsCollector, TraceWriter log)
{
var command = commandBrokeredMessage.GetBody<FooCommand>();
var committedEvents = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId);
var expectedVersion = committedEvents .Max(e => e.Version);
// some domain logic that will result in domain events
var uncommittedEvents = HandleFooCommand(command, committedEvents);
// using(Some way to lock partition)
// {
var currentVersion = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId).Max(e => e.Version);
if(expectedVersion != currentVersion)
{
throw new ConcurrencyException("expected version is not the same as current version");
}
var i = currentVersion;
foreach (var domainEvent in uncommittedEvents.OrderBy(e => e.Timestamp))
{
i++;
domainEvent.Version = i;
await eventsCollector.AddAsync(domainEvent);
}
// }
}
public class DomainEvent : TableEntity
{
private string eventType;
public virtual string EventType
{
get { return eventType ?? (eventType = GetType().UnderlyingSystemType.Name); }
set { eventType = value; }
}
public long Version { get; set; }
}
我的努力
公平地说,我无法尝试任何事情,因为我不知道从哪里开始,甚至不知道这是否可能。我做了一些研究并没有解决我的问题,但可以帮助你解决这个问题。
Azure Table 支持锁定吗? 是的,他们这样做:Managing Concurrency in Microsoft Azure Storage。这叫做租赁,但我不知道如何在 Azure 函数中实现它。
其他来源
提示、建议、备选方案
我总是乐于接受任何关于如何解决问题的建议,但我不能接受这些作为我问题的答案。除非我的问题的答案是 "no",否则我无法将备选方案标记为答案。我不是在寻找解决问题的最佳方法,我希望它按照我设计的方式工作。我知道这很固执,但这是 practice/fiddling.
Blob 租约确实可以很好地满足您要完成的任务(Functions 运行时实际上在内部广泛使用了它)。
如果在处理分区之前,您获得了 blob 的租约(按照惯例,以分区命名的 blob,或类似的名称),您将能够确保只有给定的函数在处理那个分区。
您链接到的文章确实显示了租约获取和释放的示例,您可以在 documentation.
中找到更多信息您要确保的一件事是在离开锁定范围之前刷新您的收集器(通过对其调用FlushAsync
)
希望对您有所帮助!