控制 SelectMany 的最大线程数

Controlling the max thread count of SelectMany

有没有一种方法可以用来设置 IObservable.SelectMany 的最大线程数?

以下代码非常适合在处理项目时更新 UI,但目前我要执行的任务有点占用资源。我想将最大线程设置为两个,以减少资源使用。

AsyncCommand = ReactiveCommand.CreateAsyncObservable(_ => 
{
    // Set progress bar indicator to 0

    var set = new [] {...} // The set of items to process

    // Set the progress bar indicator max to the count of the items to process

    return set
        .ToObservable()
        .SelectMany((item, index) => Task.Run(() =>
        {
            // Process item

            return item;
        }), (item, index, processed) => item); 
});

AsyncCommand
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(item => 
    {
        // Step the progress bar indicator
    });

Merge 具有最大并行度参数:

AsyncCommand = ReactiveCommand.CreateAsyncObservable(_ => 
{
    // Set progress bar indicator to 0

    var set = new [] {...} // The set of items to process

    // Set the progress bar indicator max to the count of the items to process

    return set
        .ToObservable()
        .Select(item => Observable.FromAsync(() => DoAsyncProcess(item))))
        .Merge(2);
});

另请参阅

SelectMany,正如您的代码片段中所使用的那样,它本身不会引入任何并发性,因此不会创建任何线程。这是 TPL 的作用(因为您使用 Task.Run)。 TPL 通常在不创建太多线程来完成工作方面做得很好。如果您真的想限制最大线程数,请查看 here and subsequently here

作为一个简单的替代方案,使用来自 Stephen Cleary 的伟大 AsyncEx 包的 AsyncSemaphore 并将您的处理代码放在 WaitAsyncRelease 调用之间。