如何从 long 运行 操作中获取中间结果?

How to get intermediate results from long running operation?

采取以下class并假设Calculate是一个计算密集型函数。

class Algorithm
{
    FinalResultObject Calculate()
    {
        longPartialCalculation();
        //signal to caller that that part is ready of type MidResult1
        morePartialCalculation();
        //signal more is ready, different type of  MidResult2
        moreWork();
        return finalResult;             
    }
}

现在假设,中间结果需要随时显示给用户。

我看到的选项是:

  1. 使用单独的事件发出信号

  2. 使用构造函数注入注入一个处理程序class,其方法被调用

  3. 使用 RX observables

我是 RX 的新手,但我喜欢我可以轻松地在 UI 线程上进行事件处理的想法。我想知道这是否矫枉过正而不是预期的那样,因为它不是真正的整个数据流,而是每个可观察对象的一个​​结果。另一方面,尽管与事件一样订阅和取消订阅似乎很麻烦。

有什么提示吗?

解决这个问题的 Rx 方法是定义一个 cold observable 如下:

IObservable<Result> Calculate(IScheduler scheduler)
{
  return Observable.Create<Result>(observer => 
    scheduler.Schedule(() =>
    {
       observer.OnNext(longPartialCalculation());
       observer.OnNext(morePartialCalculation());
       observer.OnNext(moreWork());
       observer.OnCompleted();
    }));
}

// Depending upon your needs, you could use inheritance as follows:
public abstract class Result { ... }
public class MidResult1 : Result { ... }
public class MidResult2 : Result { ... }
public class FinalResultObject : Result { ... }

您还可以定义一个指定默认调度程序的重载,例如 ThreadPoolScheduler 如果您想引入并发,或者 CurrentThreadScheduler 如果您不想引入并发。

要使用此方法返回的可观察对象,只需通过观察者调用 Subscribe。您可以提供一个 OnNext 处理程序来检查每个到达的 Result 对象,并提供一个 OnCompleted 处理程序来处理完成。如果必须,您还可以提供一个 OnError 处理程序来处理 Exception(编辑:请注意,尽管我的示例未调用 OnError。)

如果您想确保所有这些处理程序都在 UI 线程上执行,并且您已经将引入并发的调度程序(例如 ThreadPoolScheduler 传递给 Calculate方法,那么您还可以应用 ObserveOn 运算符(或 ObserveOnDispatcher 在基于 XAML 的平台上)将所有通知编组到 UI 线程以供观察。

algo.Calculate(ThreadPoolScheduler.Instance)
    .ObserveOnDispatcher()
    .Subscribe(OnNextResult, OnCompleted);

请注意,Rx 的主要优势之一是查询能力;例如,一个简单的过滤器:

algo.Calculate(ThreadPoolScheduler.Instance)
    .Where(result => result.HasRequiredState)
    .ObserveOnDispatcher()
    .Subscribe(result => handle(result.RequiredState));

您可以使用 .Net 的 Progress<T>。您创建一个实例,传递一个处理程序或注册到它的事件并在整个 long-运行 过程中通过它进行报告:

var progress = new Progress<string>(value => Console.WriteLine(value));
Calculate(progress);

FinalResultObject Calculate(IProgress<string> progress)
{
    longPartialCalculation();
    progress.Report("MidResult1");
    morePartialCalculation();
    progress.Report("MidResult2");
    moreWork();
    return finalResult;
}

在这种情况下,报告正在向控制台写入一个字符串,但您当然可以使用任何您想要的类型。

Progress<T> 还在创建时捕获当前 SynchronizationContext,因此您可以在 UI 线程中创建它,将它传递给非 UI 线程而不进行任何同步问题。