Service Fabric 可靠队列长操作
Service Fabric reliable queue long operation
我正在尝试了解服务结构的一些最佳实践。
如果我有一个由 Web 服务或其他机制添加的队列和一个后端任务来处理该队列,那么在后台处理长 运行 操作的最佳方法是什么。
- 在一个事务中使用 TryPeekAsync,进行处理,然后如果成功则使用 TryDequeueAsync 最终出列。
- 使用 TryDequeueAsync 删除项目,将其放入字典,然后在完成后从字典中删除。在服务启动时,检查
队列前任何待处理的字典。
两种方式都感觉有点不对劲,不知道有没有更好的办法
一个选项是在 RunAsync 中处理队列,大致如下:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var store = await StateManager.GetOrAddAsync<IReliableQueue<T>>("MyStore").ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested)
{
using (var tx = StateManager.CreateTransaction())
{
var itemFromQueue = await store.TryDequeueAsync(tx).ConfigureAwait(false);
if (!itemFromQueue.HasValue)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
continue;
}
// Process item here
// Remmber to clone the dequeued item if it is a custom type and you are going to mutate it.
// If success, await tx.CommitAsync();
// If failure to process, either let it run out of the Using transaction scope, or call tx.Abort();
}
}
}
关于克隆出列项目的评论,如果你要改变它,请查看 "Recommendations" 部分:
https://azure.microsoft.com/en-us/documentation/articles/service-fabric-reliable-services-reliable-collections/
可靠集合(队列和字典)的一个限制是每个分区只有 1 个并行度。因此,对于高 activity 队列,它可能不是最佳解决方案。这可能是您 运行 遇到的问题。
我们一直在做的是在写入量非常低的情况下使用 ReliableQueues。对于需要持久性和规模的更高吞吐量队列,我们使用 ServiceBus 主题。这也给了我们一个优势,如果一个服务只是因为有 ReliableQueue 而有状态,那么它现在可以变成无状态的。尽管这增加了对第三方服务(在本例中为 ServiceBus)的依赖性,但这可能不适合您。
另一种选择是创建一个持久的 pub/sub 实现来充当队列。我之前用 actor 做过测试,这似乎是一个可行的选择,没有花太多时间,因为我们没有任何依赖于 ServiceBus 的问题。这是关于
的另一个 SO
如果速度很慢,请使用 2 个队列。一个快速的队列用于不间断地存储工作,另一个慢的队列用于处理它。 RunAsync 用于将消息从快移动到慢。
我正在尝试了解服务结构的一些最佳实践。
如果我有一个由 Web 服务或其他机制添加的队列和一个后端任务来处理该队列,那么在后台处理长 运行 操作的最佳方法是什么。
- 在一个事务中使用 TryPeekAsync,进行处理,然后如果成功则使用 TryDequeueAsync 最终出列。
- 使用 TryDequeueAsync 删除项目,将其放入字典,然后在完成后从字典中删除。在服务启动时,检查 队列前任何待处理的字典。
两种方式都感觉有点不对劲,不知道有没有更好的办法
一个选项是在 RunAsync 中处理队列,大致如下:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var store = await StateManager.GetOrAddAsync<IReliableQueue<T>>("MyStore").ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested)
{
using (var tx = StateManager.CreateTransaction())
{
var itemFromQueue = await store.TryDequeueAsync(tx).ConfigureAwait(false);
if (!itemFromQueue.HasValue)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
continue;
}
// Process item here
// Remmber to clone the dequeued item if it is a custom type and you are going to mutate it.
// If success, await tx.CommitAsync();
// If failure to process, either let it run out of the Using transaction scope, or call tx.Abort();
}
}
}
关于克隆出列项目的评论,如果你要改变它,请查看 "Recommendations" 部分: https://azure.microsoft.com/en-us/documentation/articles/service-fabric-reliable-services-reliable-collections/
可靠集合(队列和字典)的一个限制是每个分区只有 1 个并行度。因此,对于高 activity 队列,它可能不是最佳解决方案。这可能是您 运行 遇到的问题。
我们一直在做的是在写入量非常低的情况下使用 ReliableQueues。对于需要持久性和规模的更高吞吐量队列,我们使用 ServiceBus 主题。这也给了我们一个优势,如果一个服务只是因为有 ReliableQueue 而有状态,那么它现在可以变成无状态的。尽管这增加了对第三方服务(在本例中为 ServiceBus)的依赖性,但这可能不适合您。
另一种选择是创建一个持久的 pub/sub 实现来充当队列。我之前用 actor 做过测试,这似乎是一个可行的选择,没有花太多时间,因为我们没有任何依赖于 ServiceBus 的问题。这是关于
如果速度很慢,请使用 2 个队列。一个快速的队列用于不间断地存储工作,另一个慢的队列用于处理它。 RunAsync 用于将消息从快移动到慢。