如何从 Observable 序列计算 ETA?

How to calculate an ETA from an Observable sequence?

我有一个方法可以在给定时间向我发送完成百分比更新。

public Task MyMethod(IObserver<double> progress)
{
   ...
}

我的消费者这样做:

ISubject<double> progressObserver = new Subject<double>();
await MyMethod(progressObserver);

已订阅subject以关注更新:

progressObserver.Subscribe(percent => Console.WriteLine(percent));

效果不错,但我想计算 ETA(预计完成时间)。我知道可以考虑时间和百分比来计算,但是如何计算?

肯定有一种优雅的方法可以使用 Observables (System.Reactive),也许可以利用最后的 n 百分比通知和它们之间经过的时间来估计 100 % 将完成。

但是,对不起,我不知道如何优雅地完成它。

它可能看起来像这样:

var secondsRemaining = progressObservable
    .Timestamp()
    .Buffer(5, 1)
    .Select(l => ((l[4].Timestamp - l[0].Timestamp).TotalMilliseconds / (l[4].Value - l[0].Value)) * (100 - l[4].Value))
    .Select(msRemaining => msRemaining / 1000);

解释:

对于每个进度更新,

  • .Buffer(5, 1) 发布了最近 5 次进度更新的列表。
  • .Timestamp() 将时间戳钉在每一个上。
  • 第一个 Select 运算符计算最远时间戳和最近时间戳之间的毫秒数,将其除以取得的进度,然后将其乘以剩余进度,输出剩余毫秒数。
  • 最后一个Select除以1000达到剩余秒数

以下是我对此的看法:

IObservable<Timestamped<double>> estimatedCompletion =
    progressObservable
        .Timestamp()
        .Buffer(2, 1)
        .Where(x => x.Count() == 2)
        .Scan((a, b) => a.Take(1).Concat(b).Take(1).Concat(b.Skip(1)).ToList())
        .Select(x => new
        {
            current = x[1].Value,
            delta = x[1].Timestamp.Subtract(x[0].Timestamp),
        })
        .Select(x => new
        {
            x.current,
            rate = x.current / x.delta.TotalSeconds,
        })
        .Select(x => new
        {
            x.current,
            estimated = DateTimeOffset.Now.AddSeconds((1.0 - x.current) / x.rate),
        })
        .Select(x => new Timestamped<double>(x.current, x.estimated));

这会产生一个 IObservable<Timestamped<double>>,其中时间戳是估计的 DateTimeOffset,可观察量将达到 1.0(或 100%)。

关键是它使用 .Buffer(2, 1).Where(x => x.Count() == 2) 来生成值对,因为可观察对象会生成值,然后它使用看似复杂的 .Scan((a, b) => a.Take(1).Concat(b).Take(1).Concat(b.Skip(1)).ToList()) 来始终生成一对第一个值与最新一期。

然后它只是简单地通过一系列步骤进行估算计算。

因为原始序列是从 0.0 到 1.0,所以这将最准确地磨合到最后一次。这只是一个估计值,但如果到达那里的步骤相当一致,那么这将是相当准确的。

您可以使用此代码进行测试:

var rnd = new Random();
var progressObservable = Observable.Generate(0, x => x <= 100, x => x + 1, x => x / 100.0, x => TimeSpan.FromSeconds(rnd.NextDouble()));