如何将 SelectMany 用于 Observable.FromEventPattern 序列上的异步逻辑?

How to use SelectMany for async logic on Observable.FromEventPattern sequence?

我有一个 class 公开了以下可观察序列:

internal IObservable<TaskDoneEventArgs> WhenTaskDone => Observable
    .FromEventPattern<TaskDoneEventHandler, TaskDoneEventArgs>(
            handler => NiTask.Done += handler,
            handler => NiTask.Done -= handler)
    .Select(x => x.EventArgs);

每当从序列中观察到新项目时,我想执行一些异步 TPL 逻辑。据我了解,SelectMany() 是处理异步逻辑的好方法。但是,我无法正确使用语法。

以下是我的尝试,但无法编译:

_output.WhenTaskDone
    .SelectMany(async _ => await StopDelivery())
    .Subscribe(_ => Debug.WriteLine("Delivery stopped"));

我收到以下与 SelectMany() 相关的错误:

  Error CS0411: The type arguments for method 'Observable.SelectMany<TSource, TOther>(IObservable<TSource>, IObservable<TOther>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. (85, 18)

StopDelivery()函数的签名如下:

internal Task StopDelivery()

我在这里遗漏了什么和做错了什么?

如果您想根据 returns 和 Task 的方法将单个操作插入到管道中,以便 Task 在返回原始值之前完成订户,那么你需要这样做:

.SelectMany(_ => Observable.FromAsync(() => StopDelivery()), (x, y) => x)

这是一个完整的工作示例:

void Main()
{
    WhenTaskDone
        .SelectMany(_ => Observable.FromAsync(() => StopDelivery()), (x, y) => x)
        .Subscribe(_ => Debug.WriteLine("Delivery stopped"));

    NiTask.OnDone();
}

private NiTaskClass NiTask = new NiTaskClass();

internal IObservable<TaskDoneEventArgs> WhenTaskDone =>
    Observable
        .FromEventPattern<TaskDoneEventHandler, TaskDoneEventArgs>(
            handler => NiTask.Done += handler,
            handler => NiTask.Done -= handler)
        .Select(x => x.EventArgs);


internal Task StopDelivery() => Task.Run(() => Console.WriteLine("StopDelivery"));

public delegate void TaskDoneEventHandler(object sender, TaskDoneEventArgs e);

public class TaskDoneEventArgs : EventArgs { }

public class NiTaskClass
{
    public event TaskDoneEventHandler Done;
    public void OnDone()
    {
        this.Done?.Invoke(this, new TaskDoneEventArgs());
    }
}

这是在 LINQPad 中组合在一起的 - 一旦您通过 NuGet 添加 System.Reactive,您就可以进行复制和粘贴,这应该 运行 没问题。