System.Fabric.FabricNotPrimaryException 从计时器保存状态时
System.Fabric.FabricNotPrimaryException When saving state from timer
我正在编写一个托管在 Service Fabric 中的有状态服务。该服务的工作是使用来自外部队列的消息,转换它们并将它们放置到我们自己的消息系统中。根据供应商文档,吞吐量可以高达 6k 消息/秒。
我已将服务配置为多个分区以分散消息负载,每个分区有最少 2 个/最多 3 个副本。为了从故障中恢复,我可以订阅供应商队列并传入一个时间戳,我希望从该时间点接收消息。为此,我存储了在服务状态下处理的最后一条消息的时间戳。由于消息量大,我决定在计时器上执行此操作 'save'(并允许下游消息的潜在重复)
这是当时调用的代码:
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
var saveRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
await saveRetryPolicy.ExecuteAsync(async () =>
{
using (var tx = _stateManager.CreateTransaction())
{
var state = await _stateManager.TryGetAsync<IReliableDictionary<string, long>>(TimestampStateName);
if (state.HasValue)
{
await state.Value.AddOrUpdateAsync(tx, TimestampStateName, _lastTXTimestamp,
(s, l) => _lastTXTimestamp);
await tx.CommitAsync();
}
else
{
var s =
await _stateManager.GetOrAddAsync<IReliableDictionary<string, long>>(tx, TimestampStateName);
await tx.CommitAsync();
_timer_Elapsed(this, null);
}
}
});
}
每次尝试坚持这一点时,我都会在每个分区上收到 'System.Fabric.FabricNotPrimaryException' 错误。
我已经包含了重试策略(由 Polly Retry 提供),因为有人对类似问题的评论建议这样做。这没有效果,禁止延长报告错误之前的时间。
我是否误解了应该如何使用 SF 的一些基本知识?这对我来说似乎是一个简单的用例。
来自评论的回答:
确保您没有在所有副本上启动计时器,而是仅在主副本上启动计时器。
我正在编写一个托管在 Service Fabric 中的有状态服务。该服务的工作是使用来自外部队列的消息,转换它们并将它们放置到我们自己的消息系统中。根据供应商文档,吞吐量可以高达 6k 消息/秒。
我已将服务配置为多个分区以分散消息负载,每个分区有最少 2 个/最多 3 个副本。为了从故障中恢复,我可以订阅供应商队列并传入一个时间戳,我希望从该时间点接收消息。为此,我存储了在服务状态下处理的最后一条消息的时间戳。由于消息量大,我决定在计时器上执行此操作 'save'(并允许下游消息的潜在重复)
这是当时调用的代码:
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
var saveRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
await saveRetryPolicy.ExecuteAsync(async () =>
{
using (var tx = _stateManager.CreateTransaction())
{
var state = await _stateManager.TryGetAsync<IReliableDictionary<string, long>>(TimestampStateName);
if (state.HasValue)
{
await state.Value.AddOrUpdateAsync(tx, TimestampStateName, _lastTXTimestamp,
(s, l) => _lastTXTimestamp);
await tx.CommitAsync();
}
else
{
var s =
await _stateManager.GetOrAddAsync<IReliableDictionary<string, long>>(tx, TimestampStateName);
await tx.CommitAsync();
_timer_Elapsed(this, null);
}
}
});
}
每次尝试坚持这一点时,我都会在每个分区上收到 'System.Fabric.FabricNotPrimaryException' 错误。
我已经包含了重试策略(由 Polly Retry 提供),因为有人对类似问题的评论建议这样做。这没有效果,禁止延长报告错误之前的时间。
我是否误解了应该如何使用 SF 的一些基本知识?这对我来说似乎是一个简单的用例。
来自评论的回答:
确保您没有在所有副本上启动计时器,而是仅在主副本上启动计时器。