Rx合并运算符和并行执行
Rx merge operator and parallel execution
为什么 .Merge
运算符的行为类似于 .Synchronize
?
我有两个事件流,每个都在不同的线程中发出值(例如使用 Observable.Interval
)。
在使用 .Merge
运算符后,它们在长运算符中相互锁定(合并后只有一个并行运算 运行,就像在 .Synchronize()
运算中一样)。这是为什么?
var xs1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(x => $"xs1 : {x}").Log("xs1 generating");
var xs2 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(x => $"xs1 : {x}").Log("xs2 generating");
xs1.Merge(xs2)
.Do(x=> Thread.Sleep(2_000))
.SubscribeConsoleWithThreads("after long work");
产生这些结果:
13:44:14 thread 4 xs1 generating : "xs1 : 0"
13:44:14 thread 5 xs2 generating : "xs2 : 0"
13:44:16 thread 4 after long work: "xs1 : 0"
13:44:16 thread 4 xs1 generating : "xs1 : 1"
13:44:18 thread 5 after long work: "xs2 : 0"
13:44:18 thread 5 xs2 generating : "xs2 : 1"
13:44:20 thread 4 after long work: "xs1 : 1"
13:44:20 thread 4 xs1 generating : "xs1 : 2"
如您所见,值 "xs1 : 0" 和 "xs2 : 0" 是从两个流中并行生成的,但之后,它们是一个一个生成的。
是否可以在不锁定并行执行的情况下合并到一个 Observable?
这是 Rx 的合同。生成零个或多个值,一次一个,并可选择以单个 OnError
或单个 OnCompleted
.
结束
.Synchronize()
运算符用于强制不合规的 Observable 合规。
调用 xs1.Merge(xs2)
不会 "lock parallel execution"。两个源的执行是相互独立的。 Rx 只是确保一次输出一个值,这样就不会有并发问题。
您的代码存在合并后的计算问题。如果你在合并之前完成你的工作,那么生活会更好。
比较这些:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
xs1.Merge(xs2)
.Do(x=> Thread.Sleep(2_000))
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
这会产生:
1: 0 - 18.875 - 20.877
2: 0 - 18.884 - 22.882
1: 1 - 20.881 - 24.883
2: 1 - 22.882 - 26.884
1: 2 - 24.883 - 28.884
2: 2 - 26.884 - 30.884
1: 3 - 28.884 - 32.886
2: 3 - 30.884 - 34.887
1: 4 - 32.886 - 36.887
2: 4 - 34.887 - 38.887
然后是这个:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
xs1.Merge(xs2)
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
这给出:
1: 0 - 29.178 - 31.182
2: 0 - 29.180 - 31.201
1: 1 - 31.201 - 33.202
2: 1 - 31.201 - 33.202
2: 2 - 33.202 - 35.203
1: 2 - 33.202 - 35.203
1: 3 - 35.203 - 37.204
2: 3 - 35.203 - 37.204
1: 4 - 37.204 - 39.205
2: 4 - 37.204 - 39.205
这是 observable 输出速度的两倍。
为什么 .Merge
运算符的行为类似于 .Synchronize
?
我有两个事件流,每个都在不同的线程中发出值(例如使用 Observable.Interval
)。
在使用 .Merge
运算符后,它们在长运算符中相互锁定(合并后只有一个并行运算 运行,就像在 .Synchronize()
运算中一样)。这是为什么?
var xs1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(x => $"xs1 : {x}").Log("xs1 generating");
var xs2 = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(x => $"xs1 : {x}").Log("xs2 generating");
xs1.Merge(xs2)
.Do(x=> Thread.Sleep(2_000))
.SubscribeConsoleWithThreads("after long work");
产生这些结果:
13:44:14 thread 4 xs1 generating : "xs1 : 0"
13:44:14 thread 5 xs2 generating : "xs2 : 0"
13:44:16 thread 4 after long work: "xs1 : 0"
13:44:16 thread 4 xs1 generating : "xs1 : 1"
13:44:18 thread 5 after long work: "xs2 : 0"
13:44:18 thread 5 xs2 generating : "xs2 : 1"
13:44:20 thread 4 after long work: "xs1 : 1"
13:44:20 thread 4 xs1 generating : "xs1 : 2"
如您所见,值 "xs1 : 0" 和 "xs2 : 0" 是从两个流中并行生成的,但之后,它们是一个一个生成的。
是否可以在不锁定并行执行的情况下合并到一个 Observable?
这是 Rx 的合同。生成零个或多个值,一次一个,并可选择以单个 OnError
或单个 OnCompleted
.
.Synchronize()
运算符用于强制不合规的 Observable 合规。
调用 xs1.Merge(xs2)
不会 "lock parallel execution"。两个源的执行是相互独立的。 Rx 只是确保一次输出一个值,这样就不会有并发问题。
您的代码存在合并后的计算问题。如果你在合并之前完成你的工作,那么生活会更好。
比较这些:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
xs1.Merge(xs2)
.Do(x=> Thread.Sleep(2_000))
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
这会产生:
1: 0 - 18.875 - 20.877 2: 0 - 18.884 - 22.882 1: 1 - 20.881 - 24.883 2: 1 - 22.882 - 26.884 1: 2 - 24.883 - 28.884 2: 2 - 26.884 - 30.884 1: 3 - 28.884 - 32.886 2: 3 - 30.884 - 34.887 1: 4 - 32.886 - 36.887 2: 4 - 34.887 - 38.887
然后是这个:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
xs1.Merge(xs2)
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
这给出:
1: 0 - 29.178 - 31.182 2: 0 - 29.180 - 31.201 1: 1 - 31.201 - 33.202 2: 1 - 31.201 - 33.202 2: 2 - 33.202 - 35.203 1: 2 - 33.202 - 35.203 1: 3 - 35.203 - 37.204 2: 3 - 35.203 - 37.204 1: 4 - 37.204 - 39.205 2: 4 - 37.204 - 39.205
这是 observable 输出速度的两倍。