时移序列恢复1分钟前数据

Restoring data 1 minute ago for time-shifted sequence

这个 class 累积值 + 知道当前时刻当前总和与 1 分钟前总和之间的差异。它的客户端以这样的方式使用它:为每个传入的数据块添加新值并获得差异。现在,恢复其状态存在问题。假设应用程序被回收,前一分钟泵中的数据丢失,回收后的第一分钟 Change 将等于 0,所以我必须等待一分钟才能计算差异。如何解决?

public class ChangeEstimator
{
    private int sum;

    private Subject<int> sumPump;

    private IConnectableObservable<int> hotSumPump;

    public int Sum
    {
        get
        {
            return sum;
        }

        private set
        {
            sum = value;
            sumPump.OnNext(value);
        }
    }

    public int Change { get; private set; }

    public void Start()
    {
        sumPump = new Subject<int>();
        hotSumPump = sumPump.Publish();

        var changePeriod = TimeSpan.FromMinutes(1);
        hotSumPump.Delay(changePeriod)
                  .Subscribe(value =>
                  {
                      Change = Sum - value;
                  });
        hotSumPump.Connect();
    }

    public void AddNewValue(int newValue)
    {
        Sum += newValue;
    }
}

更新

在下面的代码中你可以看到解释。客户端订阅交易流,并且对于每个新交易,它都会更新估算器。此外,客户端公开 IObservable 快照源,将数据快照推送给可以是 UI 或数据库的侦听器。问题是当回收发生时,UI 将显示为 0 而不是真正的变化。如果这个问题对于 Whosebug 来说太具体,请原谅我。我被建议使用 RabbitMQ 来保持更改的持久性。你认为它可以解决这个问题吗?

public class Transaction
{
    public int Price { get; set; }
}

public class AlgorithmResult
{
    public int Change { get; set; }
}

public interface ITransactionProvider
{
    IObservable<Transaction> TransactionStream { get; }
}

public class Client
{
    private ChangeEstimator estimator = new ChangeEstimator();

    private ITransactionProvider transactionProvider;

    public Client(ITransactionProvider transactionProvider)
    {
        this.transactionProvider = transactionProvider;
    }

    public void Init()
    {
        transactionProvider.TransactionStream.Subscribe(t =>
        {
            estimator.AddNewValue(t.Price);
        });
    }

    public IObservable<AlgorithmResult> CreateSnaphotsTimedSource(int periodSeconds)
    {
        return Observable
            .Interval(TimeSpan.FromSeconds(periodSeconds))
            .Select(_ =>
            {
                AlgorithmResult snapshot;
                snapshot = new AlgorithmResult
                {
                    Change = estimator.Change
                };
                return snapshot;
            })
            .Where(snapshot => snapshot != null);
    }
}

您的应用程序重新启动并且没有它以前生活的记忆(双关语意)。没有 Rx 技巧(在此应用程序中)可以帮助您。

正如所讨论的,您应该弄清楚业务需求并考虑在启动期间进行状态初始化。 您可能需要考虑通过 I/O 源存储最新状态或在消息发送者和消费者之间分离应用程序逻辑以实现队列。

我必须回答我自己的问题,因为我从某人那里得到了答案,这对我来说很有效。我同意正确答案取决于业务逻辑,我想我已经尽可能清楚地解释了它。

因此,这里处理可能的应用程序回收的正确方法是将 class ChangeEstimator 放入外部进程并与其交换消息。 我使用 AMQP 将消息发送到估算器 (RabbitMQ)。这里的关键点是,与包含其余部分的 Web 应用程序相比,外部进程 close/recycle 的风险非常小。