System.Fabric.FabricNotPrimaryException 从计时器保存状态时

System.Fabric.FabricNotPrimaryException When saving state from timer

我正在编写一个托管在 Service Fabric 中的有状态服务。该服务的工作是使用来自外部队列的消息,转换它们并将它们放置到我们自己的消息系统中。根据供应商文档,吞吐量可以高达 6k 消息/秒。

我已将服务配置为多个分区以分散消息负载,每个分区有最少 2 个/最多 3 个副本。为了从故障中恢复,我可以订阅供应商队列并传入一个时间戳,我希望从该时间点接收消息。为此,我存储了在服务状态下处理的最后一条消息的时间戳。由于消息量大,我决定在计时器上执行此操作 'save'(并允许下游消息的潜在重复)

这是当时调用的代码:

private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        var saveRetryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(5, retryAttempt =>
                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
            );

        await saveRetryPolicy.ExecuteAsync(async () =>
        {
            using (var tx = _stateManager.CreateTransaction())
            {
                var state = await _stateManager.TryGetAsync<IReliableDictionary<string, long>>(TimestampStateName);

                if (state.HasValue)
                {
                    await state.Value.AddOrUpdateAsync(tx, TimestampStateName, _lastTXTimestamp,
                        (s, l) => _lastTXTimestamp);

                    await tx.CommitAsync();
                }
                else
                {
                    var s =
                        await _stateManager.GetOrAddAsync<IReliableDictionary<string, long>>(tx, TimestampStateName);

                    await tx.CommitAsync();
                    _timer_Elapsed(this, null);
                }
            }
        });
    }

每次尝试坚持这一点时,我都会在每个分区上收到 'System.Fabric.FabricNotPrimaryException' 错误。

我已经包含了重试策略(由 Polly Retry 提供),因为有人对类似问题的评论建议这样做。这没有效果,禁止延长报告错误之前的时间。

我是否误解了应该如何使用 SF 的一些基本知识?这对我来说似乎是一个简单的用例。

来自评论的回答:

确保您没有在所有副本上启动计时器,而是仅在主副本上启动计时器。