使用一个可观察对象来同步另一个可观察对象
Using one observable to synchronize another observable
我想使用另一个用作时钟的 observable 来同步一个 observable,下面是一个示例。
Main: ---------abc----------------------------------
Clock: -x-----x-----x-----x-----x-----x-----x-----x--
Expected: -------------a-----b-----c--------------------
我尝试使用 Zip 方法实现此同步,类似于 RX 文档中描述的示例 (http://reactivex.io/documentation/operators/zip.html):
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)
问题是当我测试这个实现时,它不起作用。下面是我为检查预期行为而编写的测试:
scheduler = new TestScheduler();
var mainValues = scheduler.CreateHotObservable(
new Recorded<Notification<char>>(100, Notification.CreateOnNext('a')),
new Recorded<Notification<char>>(101, Notification.CreateOnNext('b')),
new Recorded<Notification<char>>(102, Notification.CreateOnNext('c')),
new Recorded<Notification<char>>(103, Notification.CreateOnNext('d')),
new Recorded<Notification<char>>(104, Notification.CreateOnNext('e')),
new Recorded<Notification<char>>(105, Notification.CreateOnNext('f')),
new Recorded<Notification<char>>(106, Notification.CreateOnNext('g')),
new Recorded<Notification<char>>(107, Notification.CreateOnCompleted<char>()));
var clockValues = scheduler.CreateHotObservable(
new Recorded<Notification<long>>(70, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(90, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(110, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(130, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(150, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(170, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(190, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(210, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(230, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(250, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(270, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(290, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(310, Notification.CreateOnCompleted<long>()));
var res = scheduler.Start(() => mainValues.Zip(clockValues, (mainValue, clockValue) => mainValue), 0, 70, long.MaxValue);
下面是预期值和我真正得到的(描述为评论):
res.Messages.AssertEqual(
OnNext(110, 'a'), // Expected: 110, a - Actual: 100, a
OnNext(130, 'b'), // Expected: 130, b - Actual: 110, b
OnNext(150, 'c'), // Expected: 150, c - Actual: 130, c
OnNext(170, 'd'), // Expected: 170, d - Actual: 150, d
OnNext(190, 'e'), // Expected: 190, e - Actual: 170, e
OnNext(210, 'f'), // Expected: 210, f - Actual: 190, f
OnNext(230, 'g'));// Expected: 230, g - Actual: 210, g
有什么问题吗?使用 Zip 同步两个 observable 是否正确?我是否错误地使用了 TestScheduler?
是否改变
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValues)
...至:
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)
修复它?
试一试:
var query =
Observable
.Create<char?>(o =>
{
IDisposable inner = null;
IDisposable subscription =
mainValues
.Publish(mvs =>
{
var q = new System.Collections.Generic.Queue<char>();
inner = mvs.Subscribe(mv => q.Enqueue(mv));
return clockValues.Select(x => q.Count > 0 ? q.Dequeue() : (char?)null);
})
.Subscribe(o);
return new CompositeDisposable(inner, subscription);
});
query.Subscribe(x => Console.WriteLine(x));
scheduler.Start();
让我知道这是否按您希望的方式工作。如果是这样,我会弹出一些解释。
我想使用另一个用作时钟的 observable 来同步一个 observable,下面是一个示例。
Main: ---------abc----------------------------------
Clock: -x-----x-----x-----x-----x-----x-----x-----x--
Expected: -------------a-----b-----c--------------------
我尝试使用 Zip 方法实现此同步,类似于 RX 文档中描述的示例 (http://reactivex.io/documentation/operators/zip.html):
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)
问题是当我测试这个实现时,它不起作用。下面是我为检查预期行为而编写的测试:
scheduler = new TestScheduler();
var mainValues = scheduler.CreateHotObservable(
new Recorded<Notification<char>>(100, Notification.CreateOnNext('a')),
new Recorded<Notification<char>>(101, Notification.CreateOnNext('b')),
new Recorded<Notification<char>>(102, Notification.CreateOnNext('c')),
new Recorded<Notification<char>>(103, Notification.CreateOnNext('d')),
new Recorded<Notification<char>>(104, Notification.CreateOnNext('e')),
new Recorded<Notification<char>>(105, Notification.CreateOnNext('f')),
new Recorded<Notification<char>>(106, Notification.CreateOnNext('g')),
new Recorded<Notification<char>>(107, Notification.CreateOnCompleted<char>()));
var clockValues = scheduler.CreateHotObservable(
new Recorded<Notification<long>>(70, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(90, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(110, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(130, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(150, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(170, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(190, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(210, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(230, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(250, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(270, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(290, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(310, Notification.CreateOnCompleted<long>()));
var res = scheduler.Start(() => mainValues.Zip(clockValues, (mainValue, clockValue) => mainValue), 0, 70, long.MaxValue);
下面是预期值和我真正得到的(描述为评论):
res.Messages.AssertEqual(
OnNext(110, 'a'), // Expected: 110, a - Actual: 100, a
OnNext(130, 'b'), // Expected: 130, b - Actual: 110, b
OnNext(150, 'c'), // Expected: 150, c - Actual: 130, c
OnNext(170, 'd'), // Expected: 170, d - Actual: 150, d
OnNext(190, 'e'), // Expected: 190, e - Actual: 170, e
OnNext(210, 'f'), // Expected: 210, f - Actual: 190, f
OnNext(230, 'g'));// Expected: 230, g - Actual: 210, g
有什么问题吗?使用 Zip 同步两个 observable 是否正确?我是否错误地使用了 TestScheduler?
是否改变
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValues)
...至:
mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)
修复它?
试一试:
var query =
Observable
.Create<char?>(o =>
{
IDisposable inner = null;
IDisposable subscription =
mainValues
.Publish(mvs =>
{
var q = new System.Collections.Generic.Queue<char>();
inner = mvs.Subscribe(mv => q.Enqueue(mv));
return clockValues.Select(x => q.Count > 0 ? q.Dequeue() : (char?)null);
})
.Subscribe(o);
return new CompositeDisposable(inner, subscription);
});
query.Subscribe(x => Console.WriteLine(x));
scheduler.Start();
让我知道这是否按您希望的方式工作。如果是这样,我会弹出一些解释。