Rx.Observable.reduce takeUntil concat 的意外行为
Unexpected behavior of Rx.Observable.reduce takeUntil concat
我对以下代码的行为感到惊讶(参见 https://plnkr.co/edit/OVc26DmXpvXqSOJsQAoh?p=preview):
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
产量
Next: source 1: 0
Next: source 1: 1
--- 此处停顿很长---
Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
但我曾希望看到 所有 序列出现在中间。出了什么问题?
编辑:
请注意,使用 share()
并不总是能治愈它。此代码失败:
let originalSequence = Observable.timer(0, 1000).take(10).share();
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
originalSequence.delay(i * 2000).map(x => "source " + i + ": " + x))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
并且此代码按我预期的方式工作,但我不明白为什么
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10).share())
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
编辑 2:
C# 版本也有我不期望的行为,但同时表现不同:
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
namespace RxScanProblem
{
class Program
{
static void Main(string[] args)
{
var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Aggregate(empty, (s1, s2) => s1.TakeUntil(s2).Concat(s2))
.SelectMany(x => x);
source.Subscribe(
s => Console.WriteLine("Next: " + s),
ex => Console.WriteLine("Error: " + ex.Message),
() => Console.WriteLine("Completed"));
originalSequence.Connect();
// Dirty, I know
Thread.Sleep(20000);
}
}
}
产量(有一些延迟)
Next: source 1: 0
Next: source 1: 1
Next: source 1: 2
编辑 3
而且 switch() 的行为与我预期的不一样!
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i => Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.switch();
产量
Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
C# 的相同 (!) 行为
var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Switch();
让我们看看您的代码做了什么。
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
第一张地图会将 {1,2,3,4,5}
转换为
s1 = Observable.timer(1 * 2000, 1000).map(x => "source 1: " + x).take(10));
s2 = Observable.timer(2 * 2000, 1000).map(x => "source 2: " + x).take(10));
s3 = Observable.timer(3 * 2000, 1000).map(x => "source 3: " + x).take(10));
s4 = Observable.timer(4 * 2000, 1000).map(x => "source 4: " + x).take(10));
s5 = Observable.timer(5 * 2000, 1000).map(x => "source 5: " + x).take(10));
接下来,reduce
会像这样把它们粘在一起:
s1.takeUntil(s2).concat(s2)
.takeUntil(s3).concat(s3)
.takeUntil(s4).concat(s4)
.takeUntil(s5).concat(s5)
现在让我们写一个小弹珠来显示所有这些流将产生什么:
s1 --0123456789
s2 ----0123456789
s3 ------0123456789
s4 --------0123456789
s5 ----------0123456789
s1.takeUntil(s2) --01|
.concat(s2) --01----0123456789
takeUntil(s3) --01--|
.concat(s3) --01--------0123456789
takeUntil(s4) --01----|
.concat(s4) --01------------0123456789
takeUntil(s5) --01------|
.concat(s5) --01----------------0123456789
现在,如果您使用 share()
,您可以有效地发布源代码。发布意味着您同时向所有订阅者进行多播。如果有 2 个订阅者,这会很好地工作,即使一个订阅者比另一个订阅者晚到达,源将继续为第二个订阅者提供中流。当第一个订阅者在第二个订阅者到达之前断开连接时,情况就会发生变化。为了保护资源,share()
将断开源,稍后重新订阅。鉴于您从冷可观察对象开始,这意味着它们将在漫长的等待中重新开始。
由于您使用 .takeUntil(s2).concat(s2)
,您实际上会在再次订阅 s2
之前取消订阅 s2
。毕竟 concat
在从 takeUntil
接收到 completed
之前不会连接,并且 takeUntil
在 s2
产生之前不会发出 completed
。如果 s2
屈服,takeUntil 将在转发 completed
之前立即取消订阅。这意味着 s2 将在一瞬间没有订阅者,源将被重置。
您可能期望的是 s2
会一直保持连接状态,并会在后台保持 运行ning。如果您使用的是来自活动源的热可观察对象而不是冷可观察对象制成的热槽,这将起作用 share()
.
我不会详细介绍 switch()
因为我相信你已经理解了那里的问题:它会在下一个到达时断开前一个源,而不是在下一个产生时断开。
你能做的,就是写你自己的'switchOnYield'
source.publish(src => src
.flatMap(inner1 =>
inner1.takeUntil(src.flatMap(inner2 => inner2.take(1)))
))
这样做是将 source 中的所有来源合并在一起,但在它们上面添加 takeUntil
和所有后面的来源。如果任何后来的资源产生,第一批将被取消订阅。这是有效的,因为第一次 src
产生,.flatMap(inner1
将 运行。第二次 yield,src.flatMap(inner2
会将后面源中的任何项目合并到 takeUntil
运算符中。
我强烈建议您 post 预期或期望的输出。目前尚不清楚你想要什么。对于 C# 示例,将源代码更改为以下内容会让您更接近(我认为,我不确定):
var source = Observable.Range(1, 5)
.Select(i => originalSequence
.Delay(TimeSpan.FromSeconds(2 * i))
.Select(x => "source " + i + ": " + x)
)
//making sure s2 is shared properly, thus concated properly
.Aggregate(empty, (s1, s2) => s2.Publish( _s2 => s1
.TakeUntil(_s2)
.Concat(_s2)
))
.SelectMany(x => x);
这会产生以下输出:
Next: source 1: 0
Next: source 1: 1
Next: source 2: 1
Next: source 3: 1
Next: source 4: 1
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
这对我来说很有意义。如果你能 post 想要的输出,我会帮助你。
我对以下代码的行为感到惊讶(参见 https://plnkr.co/edit/OVc26DmXpvXqSOJsQAoh?p=preview):
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
产量
Next: source 1: 0
Next: source 1: 1
--- 此处停顿很长---
Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
但我曾希望看到 所有 序列出现在中间。出了什么问题?
编辑:
请注意,使用 share()
并不总是能治愈它。此代码失败:
let originalSequence = Observable.timer(0, 1000).take(10).share();
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
originalSequence.delay(i * 2000).map(x => "source " + i + ": " + x))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
并且此代码按我预期的方式工作,但我不明白为什么
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10).share())
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
编辑 2:
C# 版本也有我不期望的行为,但同时表现不同:
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
namespace RxScanProblem
{
class Program
{
static void Main(string[] args)
{
var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Aggregate(empty, (s1, s2) => s1.TakeUntil(s2).Concat(s2))
.SelectMany(x => x);
source.Subscribe(
s => Console.WriteLine("Next: " + s),
ex => Console.WriteLine("Error: " + ex.Message),
() => Console.WriteLine("Completed"));
originalSequence.Connect();
// Dirty, I know
Thread.Sleep(20000);
}
}
}
产量(有一些延迟)
Next: source 1: 0
Next: source 1: 1
Next: source 1: 2
编辑 3
而且 switch() 的行为与我预期的不一样!
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i => Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.switch();
产量
Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
C# 的相同 (!) 行为
var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Switch();
让我们看看您的代码做了什么。
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
第一张地图会将 {1,2,3,4,5}
转换为
s1 = Observable.timer(1 * 2000, 1000).map(x => "source 1: " + x).take(10));
s2 = Observable.timer(2 * 2000, 1000).map(x => "source 2: " + x).take(10));
s3 = Observable.timer(3 * 2000, 1000).map(x => "source 3: " + x).take(10));
s4 = Observable.timer(4 * 2000, 1000).map(x => "source 4: " + x).take(10));
s5 = Observable.timer(5 * 2000, 1000).map(x => "source 5: " + x).take(10));
接下来,reduce
会像这样把它们粘在一起:
s1.takeUntil(s2).concat(s2)
.takeUntil(s3).concat(s3)
.takeUntil(s4).concat(s4)
.takeUntil(s5).concat(s5)
现在让我们写一个小弹珠来显示所有这些流将产生什么:
s1 --0123456789
s2 ----0123456789
s3 ------0123456789
s4 --------0123456789
s5 ----------0123456789
s1.takeUntil(s2) --01|
.concat(s2) --01----0123456789
takeUntil(s3) --01--|
.concat(s3) --01--------0123456789
takeUntil(s4) --01----|
.concat(s4) --01------------0123456789
takeUntil(s5) --01------|
.concat(s5) --01----------------0123456789
现在,如果您使用 share()
,您可以有效地发布源代码。发布意味着您同时向所有订阅者进行多播。如果有 2 个订阅者,这会很好地工作,即使一个订阅者比另一个订阅者晚到达,源将继续为第二个订阅者提供中流。当第一个订阅者在第二个订阅者到达之前断开连接时,情况就会发生变化。为了保护资源,share()
将断开源,稍后重新订阅。鉴于您从冷可观察对象开始,这意味着它们将在漫长的等待中重新开始。
由于您使用 .takeUntil(s2).concat(s2)
,您实际上会在再次订阅 s2
之前取消订阅 s2
。毕竟 concat
在从 takeUntil
接收到 completed
之前不会连接,并且 takeUntil
在 s2
产生之前不会发出 completed
。如果 s2
屈服,takeUntil 将在转发 completed
之前立即取消订阅。这意味着 s2 将在一瞬间没有订阅者,源将被重置。
您可能期望的是 s2
会一直保持连接状态,并会在后台保持 运行ning。如果您使用的是来自活动源的热可观察对象而不是冷可观察对象制成的热槽,这将起作用 share()
.
我不会详细介绍 switch()
因为我相信你已经理解了那里的问题:它会在下一个到达时断开前一个源,而不是在下一个产生时断开。
你能做的,就是写你自己的'switchOnYield'
source.publish(src => src
.flatMap(inner1 =>
inner1.takeUntil(src.flatMap(inner2 => inner2.take(1)))
))
这样做是将 source 中的所有来源合并在一起,但在它们上面添加 takeUntil
和所有后面的来源。如果任何后来的资源产生,第一批将被取消订阅。这是有效的,因为第一次 src
产生,.flatMap(inner1
将 运行。第二次 yield,src.flatMap(inner2
会将后面源中的任何项目合并到 takeUntil
运算符中。
我强烈建议您 post 预期或期望的输出。目前尚不清楚你想要什么。对于 C# 示例,将源代码更改为以下内容会让您更接近(我认为,我不确定):
var source = Observable.Range(1, 5)
.Select(i => originalSequence
.Delay(TimeSpan.FromSeconds(2 * i))
.Select(x => "source " + i + ": " + x)
)
//making sure s2 is shared properly, thus concated properly
.Aggregate(empty, (s1, s2) => s2.Publish( _s2 => s1
.TakeUntil(_s2)
.Concat(_s2)
))
.SelectMany(x => x);
这会产生以下输出:
Next: source 1: 0
Next: source 1: 1
Next: source 2: 1
Next: source 3: 1
Next: source 4: 1
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
这对我来说很有意义。如果你能 post 想要的输出,我会帮助你。