是否可以将 Rx "Using" 运算符与 IAsyncDisposable 一起使用?

Is it possible to use Rx "Using" operator with IAsyncDisposable?

是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable 而不是 IDisposable 的资源一起使用?如果没有,我可以使用某种解决方法吗?

这里是一个 Using 方法,适用于 IAsyncDisposable 个对象:

/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IObservable<TResult>> observableFactory)
    where TResource : IAsyncDisposable
{
    return Observable.Defer(() =>
    {
        TResource resource = resourceFactory();
        IObservable<TResult> observable;
        try { observable = observableFactory(resource); }
        catch (Exception ex) { observable = Observable.Throw<TResult>(ex); }

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
    });
}

此方法与Rx Observable.Using方法具有相同的签名(除了where子句),并且可以以相同的方式使用。

此实现处理所有完成情况:

  1. 成功完成:IAsyncDisposable 资源由 Concat 运算符异步处理。
  2. 完成但出现错误:IAsyncDisposable 资源由 Catch 运算符异步处理。
  3. 序列在完成之前取消订阅:IAsyncDisposable 资源由 Finally 运算符同步处理。在这种情况下,异步处理资源是不可能的,原因已解释 .

异步工厂方法的变体:

public static IObservable<TResult> Using<TResult, TResource>(
    Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
    Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
    where TResource : IAsyncDisposable
{
    return Observable.Create<TResult>(async (observer, cancellationToken) =>
    {
        TResource resource = await resourceFactoryAsync(cancellationToken);
        IObservable<TResult> observable;
        try { observable = await observableFactoryAsync(resource, cancellationToken); }
        catch { await resource.DisposeAsync(); throw; }

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
            .Subscribe(observer);
    });
}