使用 System.Observable 将长阻塞调用卸载到单独的线程

Offloading long blocking calls to separate threads using System.Observable

我想在单独的线程中执行阻塞方法。为了说明这种情况,我创建了这个例子:

int LongBlockingCall(int n)
{
    Thread.Sleep(1000);
    return n + 1;
}

var observable = Observable
.Range(0, 10)
    .SelectMany(n => 
        Observable.Defer(() => Observable.Return(LongBlockingCall(n), NewThreadScheduler.Default)));
    
var results = await observable.ToList();

我预计这会在 ~1 秒内达到 运行,但它需要 ~10 秒。 每次调用都应该生成一个新线程,因为我指定了 NewThreadScheduler.Default,对吗?

我做错了什么?

你必须替换这一行:

Observable.Return(LongBlockingCall(n), NewThreadScheduler.Default)

...有了这个:

Observable.Start(() => LongBlockingCall(n), NewThreadScheduler.Default)

Observable.Return just returns a value. It's not aware how this value is produced. In order to invoke an action on a specific scheduler, you need the Observable.Start 方法。