如何创建一个既是任务<T>又是IObservable<T>的class?

How can I create a class that is both a Task<T> and an IObservable<T>?

最近我遇到了一种情况,将异步操作同时表示为 Task<T>IObservable<T> 会比较有利。任务表示维护操作的状态(IsCompletedIsFaulted 等),而可观察表示能够以有趣的方式组合多个操作(ConcatMergeSwitch 等),自动处理取消订阅的任何操作,以这种方式解决异步操作即发即忘的问题。所以我对结合这两种表示的方法很感兴趣。

组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 Task<T>IObservable<T> 的类型,并将它们公开为它的两个属性。但是在这个问题中,我感兴趣的是 is a Task<T> and is an 的类型的具有挑战性且可能不切实际的可能性IObservable<T>同时。一种类型,可以直接传递给接受任务或可观察对象的 API,并在任何一种情况下都做正确的事情。所以它不能只是一个类似任务的对象。它必须继承自真实的东西,即 Task<T> class 本身。像这样:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

创建一个 AsyncOperation 实例应该立即调用提供的操作。换句话说,AsyncOperation 应该代表 hot task/observable 组合。

是否可以创建这样的类型?

顺便说一句,这是 ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable

中的一个线程

我找到了一种创建从 Task 继承的可观​​察对象的方法,方法是使用 @GlennSlayden 在 this 回答中描述的天才技术。

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

上述解决方案并不完美,因为派生的 class 实例永远无法转换为 Canceled 状态。这是一个我不知道如何修复的问题,它可能无法修复,但可能不是很重要。取消以 TaskCanceledException 形式出现,处理此异常是处理已取消任务的正常方式。

有趣的是,可以通过创建虚拟订阅并处理它来取消异步操作:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

我对此 class 进行了一些试验,发现它不如我最初想象的那么实用。问题是存在许多同时支持任务和可观察对象的 API,并且在其他方​​面是相同的(例如 ConcatMergeSwitchWait 等)。这导致经常出现编译时错误(CS0121 模棱两可的调用)。可以通过将类型转换为任务或可观察对象来解决歧义,但这很尴尬,并且首先否定了将这两种类型组合在一起的整个目的。


澄清:_promise.GetAwaiter().GetResult() 乍一看可能表明此实现阻塞了一个 ThreadPool 线程。事实并非如此,因为基础 Task 最初是冷的,只有在 _promise 完成后才会变热。