当 Observable 滴答并合并结果时启动任务(使用 Rx.NET)

Starting a task when an Observable ticks and merge results (using Rx.NET)

我想知道是否可以在每次 Observable 滴答时启动一个任务并使用这些任务的结果继续管道。结果的顺序无关紧要。我正在使用 C#。

linksObservable
    .Select(url=> downloadTask(url))
    ...

上面的代码将启动下载 url 的任务,但我如何获得可用的结果(即 downloadTask 已完成)。另一个考虑因素是来自任务的Exceptions。任何异常都不应影响其余任务。

好吧,如果 downloadTask returns 你能做点什么

observable
    .SelectMany(url => downloadTask(url))
    .Subscribe(result => Console.WriteLine(result));

完整示例:

void Main()
{
    var observable = new Subject<string>();

    observable
        .SelectMany(url => downloadTask(url))
        .Subscribe(result => Console.WriteLine(result));

    observable.OnNext("a");
    observable.OnNext("b");
    observable.OnNext("c");
    observable.OnNext("d");
}

public async Task<string> downloadTask(string s)
{
    await Task.Delay(1000);
    return s;
}