如何创建一个既是任务<T>又是IObservable<T>的class?
How can I create a class that is both a Task<T> and an IObservable<T>?
最近我遇到了一种情况,将异步操作同时表示为 Task<T>
和 IObservable<T>
会比较有利。任务表示维护操作的状态(IsCompleted
、IsFaulted
等),而可观察表示能够以有趣的方式组合多个操作(Concat
、Merge
、 Switch
等),自动处理取消订阅的任何操作,以这种方式解决异步操作即发即忘的问题。所以我对结合这两种表示的方法很感兴趣。
组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 Task<T>
和 IObservable<T>
的类型,并将它们公开为它的两个属性。但是在这个问题中,我感兴趣的是 is a Task<T>
and is an 的类型的具有挑战性且可能不切实际的可能性IObservable<T>
同时。一种类型,可以直接传递给接受任务或可观察对象的 API,并在任何一种情况下都做正确的事情。所以它不能只是一个类似任务的对象。它必须继承自真实的东西,即 Task<T>
class 本身。像这样:
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
{
//...
}
}
创建一个 AsyncOperation
实例应该立即调用提供的操作。换句话说,AsyncOperation
应该代表 hot task/observable 组合。
是否可以创建这样的类型?
顺便说一句,这是 ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable
中的一个线程
我找到了一种创建从 Task
继承的可观察对象的方法,方法是使用 @GlennSlayden 在 this 回答中描述的天才技术。
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
private readonly IObservable<TResult> _observable;
private readonly Task<TResult> _promise;
private AsyncOperation(Func<TResult> function) : base(() => function())
=> function = this.GetResult;
private TResult GetResult() => _promise.GetAwaiter().GetResult();
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
: this((Func<TResult>)null)
{
_observable = Observable.StartAsync(action, Scheduler.Immediate);
_promise = _observable.ToTask();
_promise.ContinueWith(_ => base.RunSynchronously(), default,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
=> _observable.Subscribe(observer);
}
上述解决方案并不完美,因为派生的 class 实例永远无法转换为 Canceled
状态。这是一个我不知道如何修复的问题,它可能无法修复,但可能不是很重要。取消以 TaskCanceledException
形式出现,处理此异常是处理已取消任务的正常方式。
有趣的是,可以通过创建虚拟订阅并处理它来取消异步操作:
var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });
operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken
我对此 class 进行了一些试验,发现它不如我最初想象的那么实用。问题是存在许多同时支持任务和可观察对象的 API,并且在其他方面是相同的(例如 Concat
、Merge
、Switch
、Wait
等)。这导致经常出现编译时错误(CS0121 模棱两可的调用)。可以通过将类型转换为任务或可观察对象来解决歧义,但这很尴尬,并且首先否定了将这两种类型组合在一起的整个目的。
澄清: 行 _promise.GetAwaiter().GetResult()
乍一看可能表明此实现阻塞了一个 ThreadPool
线程。事实并非如此,因为基础 Task
最初是冷的,只有在 _promise
完成后才会变热。
最近我遇到了一种情况,将异步操作同时表示为 Task<T>
和 IObservable<T>
会比较有利。任务表示维护操作的状态(IsCompleted
、IsFaulted
等),而可观察表示能够以有趣的方式组合多个操作(Concat
、Merge
、 Switch
等),自动处理取消订阅的任何操作,以这种方式解决异步操作即发即忘的问题。所以我对结合这两种表示的方法很感兴趣。
组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 Task<T>
和 IObservable<T>
的类型,并将它们公开为它的两个属性。但是在这个问题中,我感兴趣的是 is a Task<T>
and is an 的类型的具有挑战性且可能不切实际的可能性IObservable<T>
同时。一种类型,可以直接传递给接受任务或可观察对象的 API,并在任何一种情况下都做正确的事情。所以它不能只是一个类似任务的对象。它必须继承自真实的东西,即 Task<T>
class 本身。像这样:
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
{
//...
}
}
创建一个 AsyncOperation
实例应该立即调用提供的操作。换句话说,AsyncOperation
应该代表 hot task/observable 组合。
是否可以创建这样的类型?
顺便说一句,这是 ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable
中的一个线程我找到了一种创建从 Task
继承的可观察对象的方法,方法是使用 @GlennSlayden 在 this 回答中描述的天才技术。
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
private readonly IObservable<TResult> _observable;
private readonly Task<TResult> _promise;
private AsyncOperation(Func<TResult> function) : base(() => function())
=> function = this.GetResult;
private TResult GetResult() => _promise.GetAwaiter().GetResult();
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
: this((Func<TResult>)null)
{
_observable = Observable.StartAsync(action, Scheduler.Immediate);
_promise = _observable.ToTask();
_promise.ContinueWith(_ => base.RunSynchronously(), default,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
=> _observable.Subscribe(observer);
}
上述解决方案并不完美,因为派生的 class 实例永远无法转换为 Canceled
状态。这是一个我不知道如何修复的问题,它可能无法修复,但可能不是很重要。取消以 TaskCanceledException
形式出现,处理此异常是处理已取消任务的正常方式。
有趣的是,可以通过创建虚拟订阅并处理它来取消异步操作:
var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });
operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken
我对此 class 进行了一些试验,发现它不如我最初想象的那么实用。问题是存在许多同时支持任务和可观察对象的 API,并且在其他方面是相同的(例如 Concat
、Merge
、Switch
、Wait
等)。这导致经常出现编译时错误(CS0121 模棱两可的调用)。可以通过将类型转换为任务或可观察对象来解决歧义,但这很尴尬,并且首先否定了将这两种类型组合在一起的整个目的。
澄清: 行 _promise.GetAwaiter().GetResult()
乍一看可能表明此实现阻塞了一个 ThreadPool
线程。事实并非如此,因为基础 Task
最初是冷的,只有在 _promise
完成后才会变热。