订阅现有的可观察对象,除非它已完成
Subscribe to existing observable unless it's completed
我已经好几次需要这样的结构了,但我不太清楚如何实现它。我的问题是:当 A 发生时,我想创建一个复杂的可观察对象(通过组合几个运算符制成)。它将异步完成一些动作,发布结果并完成。同时我想允许新的订阅这个 observable,但是一旦它完成,就应该创建新的 observable,它是第一个的副本(或者只是做同样的事情)。
(edit) 作为一个例子,让我们有一个简单的 observable:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)
。我的目标是以下行为:
[毫秒:动作]
0: obs.subscribe(...)
- 我希望这个 observable 在 ~1s
之后完成
500:obs.subscribe(...)
- 这个应该在 ~500 毫秒后完成
950:同上,应该在 50 毫秒后完成
1500:原来的observable应该已经完成了。我现在想重新开始一切,并在 1s
后完成这里的订阅
2000:在这里我想连接到最新的可观察到的并期望它在 500 秒后完成(因为新的秒数从 1500 开始)
我不太清楚如何以正确且线程安全的方式进行操作。我可以用一个 observable 来做吗?
您可以使用 defer
和 share
来实现。
Observable<Long> o = Observable.defer(() ->
Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();
o.subscribe(System.out::println); // T = 0
Thread.sleep(500);
o.subscribe(System.out::println); // T = 500
Thread.sleep(450);
o.subscribe(System.out::println); // T = 950
Thread.sleep(550);
o.subscribe(System.out::println); // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println); // T == 2000
Thread.sleep(1000);
前三个将在 1 秒后同时完成(具有相同的值),后两个将在第一个后 1.5 秒完成(与第一个具有不同的值)。
我已经好几次需要这样的结构了,但我不太清楚如何实现它。我的问题是:当 A 发生时,我想创建一个复杂的可观察对象(通过组合几个运算符制成)。它将异步完成一些动作,发布结果并完成。同时我想允许新的订阅这个 observable,但是一旦它完成,就应该创建新的 observable,它是第一个的副本(或者只是做同样的事情)。
(edit) 作为一个例子,让我们有一个简单的 observable:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)
。我的目标是以下行为:
[毫秒:动作]
0: obs.subscribe(...)
- 我希望这个 observable 在 ~1s
500:obs.subscribe(...)
- 这个应该在 ~500 毫秒后完成
950:同上,应该在 50 毫秒后完成
1500:原来的observable应该已经完成了。我现在想重新开始一切,并在 1s
后完成这里的订阅2000:在这里我想连接到最新的可观察到的并期望它在 500 秒后完成(因为新的秒数从 1500 开始)
我不太清楚如何以正确且线程安全的方式进行操作。我可以用一个 observable 来做吗?
您可以使用 defer
和 share
来实现。
Observable<Long> o = Observable.defer(() ->
Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();
o.subscribe(System.out::println); // T = 0
Thread.sleep(500);
o.subscribe(System.out::println); // T = 500
Thread.sleep(450);
o.subscribe(System.out::println); // T = 950
Thread.sleep(550);
o.subscribe(System.out::println); // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println); // T == 2000
Thread.sleep(1000);
前三个将在 1 秒后同时完成(具有相同的值),后两个将在第一个后 1.5 秒完成(与第一个具有不同的值)。