为什么 IEnumerable.ToObservable 这么慢?

Why is IEnumerable.ToObservable so slow?

我正在尝试枚举一个大的 IEnumerable once, and observe the enumeration with various operators attached (Count, Sum, Average etc). The obvious way is to transform it to an IObservable with the method ToObservable,然后为它订阅一个观察者。我注意到这比其他方法慢得多,比如执行一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create 方法而不是 ToObservable。差别很大:它慢了 20-30 倍。原来如此,还是我做错了什么?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

输出:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0,C# 8,System.Reactive 4.3.2,Windows 10,控制台应用程序,内置版本


更新:这是我想要实现的实际功能的示例:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

输出:

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

与使用标准 LINQ 运算符相比,此方法的重要区别在于源可枚举仅被枚举一次。


再观察一下: 使用 ToObservable(Scheduler.Immediate)ToObservable().

稍快(大约 20%)

这是行为良好的可观察对象与 "roll-your-own-because-you-think-faster-is-better-but-it-is-not" 可观察对象之间的区别。

当你深入源代码时,你会发现这条可爱的小线:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

每个计划的递归迭代有效地调用 hasNext = enumerator.MoveNext(); 一次。

这允许您为 .ToObservable(schedulerOfYourChoice) 呼叫选择调度程序。

使用您选择的其他选项,您创建了一系列对 .OnNext 的基本调用,实际上什么都不做。 Method2 甚至没有 .Subscribe 电话。

Method2Method1 运行 都使用当前线程,并且 运行 在订阅完成之前完成。他们正在阻止呼叫。它们会导致竞争条件。

Method1 是唯一一个表现良好的可观察对象。它是异步的,并且可以 运行 独立于订阅者。

请记住,可观察对象是 运行 随着时间推移的集合。它们通常具有异步源或定时器或对外部刺激的响应。他们通常不会 运行 脱离普通的可枚举。如果您正在使用可枚举对象,那么同步工作应该 运行 更快。

速度不是 Rx 的目标。目标是对基于时间的推送值执行复杂查询。

因为 Subject 什么都不做。

看起来循环语句的性能在两种情况下是不同的:

for(int i=0;i<1000000;i++)
    total++;

for(int i=0;i<1000000;i++)
    DoHeavyJob();

如果使用另一个 Subject,并使用较慢的 OnNext 实现,结果会更容易接受

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

输出

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

ToObservable 支持System.Reactive.Concurrency.IScheduler

这意味着您可以实施自己的 IScheduler 并决定何时 运行 每个任务

希望对您有所帮助

此致