订阅现有的可观察对象,除非它已完成

Subscribe to existing observable unless it's completed

我已经好几次需要这样的结构了,但我不太清楚如何实现它。我的问题是:当 A 发生时,我想创建一个复杂的可观察对象(通过组合几个运算符制成)。它将异步完成一些动作,发布结果并完成。同时我想允许新的订阅这个 observable,但是一旦它完成,就应该创建新的 observable,它是第一个的副本(或者只是做同样的事情)。

(edit) 作为一个例子,让我们有一个简单的 observable:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)。我的目标是以下行为:

[毫秒:动作]

0: obs.subscribe(...) - 我希望这个 observable 在 ~1s

之后完成

500:obs.subscribe(...) - 这个应该在 ~500 毫秒后完成

950:同上,应该在 50 毫秒后完成

1500:原来的observable应该已经完成​​了。我现在想重新开始一切,并在 1s

后完成这里的订阅

2000:在这里我想连接到最新的可观察到的并期望它在 500 秒后完成(因为新的秒数从 1500 开始)

我不太清楚如何以正确且线程安全的方式进行操作。我可以用一个 observable 来做吗?

您可以使用 defershare 来实现。

Observable<Long> o = Observable.defer(() ->
    Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();

o.subscribe(System.out::println);   // T = 0
Thread.sleep(500);
o.subscribe(System.out::println);  // T = 500
Thread.sleep(450);
o.subscribe(System.out::println);   // T = 950

Thread.sleep(550);
o.subscribe(System.out::println);   // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println);   // T == 2000

Thread.sleep(1000);

前三个将在 1 秒后同时完成(具有相同的值),后两个将在第一个后 1.5 秒完成(与第一个具有不同的值)。