为什么 IEnumerable.ToObservable 这么慢?
Why is IEnumerable.ToObservable so slow?
我正在尝试枚举一个大的 IEnumerable
once, and observe the enumeration with various operators attached (Count
, Sum
, Average
etc). The obvious way is to transform it to an IObservable
with the method ToObservable
,然后为它订阅一个观察者。我注意到这比其他方法慢得多,比如执行一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create
方法而不是 ToObservable
。差别很大:它慢了 20-30 倍。原来如此,还是我做错了什么?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
输出:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0,C# 8,System.Reactive 4.3.2,Windows 10,控制台应用程序,内置版本
更新:这是我想要实现的实际功能的示例:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
输出:
Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5
与使用标准 LINQ 运算符相比,此方法的重要区别在于源可枚举仅被枚举一次。
再观察一下: 使用 ToObservable(Scheduler.Immediate)
比 ToObservable()
.
稍快(大约 20%)
这是行为良好的可观察对象与 "roll-your-own-because-you-think-faster-is-better-but-it-is-not" 可观察对象之间的区别。
当你深入源代码时,你会发现这条可爱的小线:
scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));
每个计划的递归迭代有效地调用 hasNext = enumerator.MoveNext();
一次。
这允许您为 .ToObservable(schedulerOfYourChoice)
呼叫选择调度程序。
使用您选择的其他选项,您创建了一系列对 .OnNext
的基本调用,实际上什么都不做。 Method2
甚至没有 .Subscribe
电话。
Method2
和 Method1
运行 都使用当前线程,并且 运行 在订阅完成之前完成。他们正在阻止呼叫。它们会导致竞争条件。
Method1
是唯一一个表现良好的可观察对象。它是异步的,并且可以 运行 独立于订阅者。
请记住,可观察对象是 运行 随着时间推移的集合。它们通常具有异步源或定时器或对外部刺激的响应。他们通常不会 运行 脱离普通的可枚举。如果您正在使用可枚举对象,那么同步工作应该 运行 更快。
速度不是 Rx 的目标。目标是对基于时间的推送值执行复杂查询。
因为 Subject 什么都不做。
看起来循环语句的性能在两种情况下是不同的:
for(int i=0;i<1000000;i++)
total++;
或
for(int i=0;i<1000000;i++)
DoHeavyJob();
如果使用另一个 Subject,并使用较慢的 OnNext 实现,结果会更容易接受
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 100;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
class My_Slow_Subject : SubjectBase<int>
{
public override void OnNext(int value)
{
//do a job which spend 3ms
System.Threading.Thread.Sleep(3);
}
bool _disposed;
public override bool IsDisposed => _disposed;
public override void Dispose() => _disposed = true;
public override void OnCompleted() { }
public override void OnError(Exception error) { }
public override bool HasObservers => false;
public override IDisposable Subscribe(IObserver<int> observer)
=> throw new NotImplementedException();
}
static SubjectBase<int> CreateSubject()
{
return new My_Slow_Subject();
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
输出
ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec
ToObservable 支持System.Reactive.Concurrency.IScheduler
这意味着您可以实施自己的 IScheduler 并决定何时 运行 每个任务
希望对您有所帮助
此致
我正在尝试枚举一个大的 IEnumerable
once, and observe the enumeration with various operators attached (Count
, Sum
, Average
etc). The obvious way is to transform it to an IObservable
with the method ToObservable
,然后为它订阅一个观察者。我注意到这比其他方法慢得多,比如执行一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create
方法而不是 ToObservable
。差别很大:它慢了 20-30 倍。原来如此,还是我做错了什么?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
输出:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0,C# 8,System.Reactive 4.3.2,Windows 10,控制台应用程序,内置版本
更新:这是我想要实现的实际功能的示例:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
输出:
Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5
与使用标准 LINQ 运算符相比,此方法的重要区别在于源可枚举仅被枚举一次。
再观察一下: 使用 ToObservable(Scheduler.Immediate)
比 ToObservable()
.
这是行为良好的可观察对象与 "roll-your-own-because-you-think-faster-is-better-but-it-is-not" 可观察对象之间的区别。
当你深入源代码时,你会发现这条可爱的小线:
scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));
每个计划的递归迭代有效地调用 hasNext = enumerator.MoveNext();
一次。
这允许您为 .ToObservable(schedulerOfYourChoice)
呼叫选择调度程序。
使用您选择的其他选项,您创建了一系列对 .OnNext
的基本调用,实际上什么都不做。 Method2
甚至没有 .Subscribe
电话。
Method2
和 Method1
运行 都使用当前线程,并且 运行 在订阅完成之前完成。他们正在阻止呼叫。它们会导致竞争条件。
Method1
是唯一一个表现良好的可观察对象。它是异步的,并且可以 运行 独立于订阅者。
请记住,可观察对象是 运行 随着时间推移的集合。它们通常具有异步源或定时器或对外部刺激的响应。他们通常不会 运行 脱离普通的可枚举。如果您正在使用可枚举对象,那么同步工作应该 运行 更快。
速度不是 Rx 的目标。目标是对基于时间的推送值执行复杂查询。
因为 Subject 什么都不做。
看起来循环语句的性能在两种情况下是不同的:
for(int i=0;i<1000000;i++)
total++;
或
for(int i=0;i<1000000;i++)
DoHeavyJob();
如果使用另一个 Subject,并使用较慢的 OnNext 实现,结果会更容易接受
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 100;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
class My_Slow_Subject : SubjectBase<int>
{
public override void OnNext(int value)
{
//do a job which spend 3ms
System.Threading.Thread.Sleep(3);
}
bool _disposed;
public override bool IsDisposed => _disposed;
public override void Dispose() => _disposed = true;
public override void OnCompleted() { }
public override void OnError(Exception error) { }
public override bool HasObservers => false;
public override IDisposable Subscribe(IObserver<int> observer)
=> throw new NotImplementedException();
}
static SubjectBase<int> CreateSubject()
{
return new My_Slow_Subject();
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = CreateSubject();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
输出
ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec
ToObservable 支持System.Reactive.Concurrency.IScheduler
这意味着您可以实施自己的 IScheduler 并决定何时 运行 每个任务
希望对您有所帮助
此致