RxJava:结合冷热可观察以相互等待

RxJava: Combining hot and cold observable to wait for each other

我的观察值是这样定义的

    val initLoading = Observable.fromCallable { println("${System.currentTimeMillis()}") }
            .subscribeOn(Schedulers.computation())
            .delay(WAIT_TIME, TimeUnit.SECONDS)
            .map { "loading ${System.currentTimeMillis()}" }
            .observeOn(AndroidSchedulers.mainThread())

    val click = RxView.clicks(button).map { "click ${System.currentTimeMillis()}" }
    initLoading.concatWith(click)
            .subscribeBy(
                    onNext = { println("result $it") },
                    onError = { throw it }
            )

initialLoading 在 Activity 的 onCreate 方法处启动 运行。 click 在按钮单击时执行。我有两个案例,第一个有效,第二个无效。

案例 1

activity 启动并在 WAIT_TIME 秒后单击按钮。输出:

   01-23 13:08:07.170  I/System.out: 1516698487170
   01-23 13:08:17.174  I/System.out: result loading 1516698497172
   01-23 13:08:29.258  I/System.out: result click 1516698509258

案例2

activity 启动并单击按钮 before WAIT_TIME 期间结束。输出

   01-23 13:09:07.392 I/System.out: 1516698547392
   01-23 13:09:17.398 I/System.out: result loading 1516698557395

所以,问题是点击事件丢失了。我希望点击事件等待加载,然后继续工作。简而言之,案例 2 的输出应该与案例 1 相同。

我如何使用 rx 运算符来切碎它。我试过 merge 但它只是结合了两者并且点击事件不等待加载。

我也试过 reply, cache, publish, share 但无法按照我的意愿将它们正确组合。

尝试 combineLatest 而不是 concatWith。 "result loading" 总是一样的

concatWith 运算符适合您的用例,但第二个可观察对象应在创建后立即开始存储点击事件,以便在订阅可观察对象时可以发出存储的事件(这会发生当 initLoading 完成时)。这可以通过使用 replay()connect().

修改您的 click observable 来实现
val replayedClicks = click.replay();
replayedClicks.connect(); // The original click observable is subscribed to at this point

现在您可以在 concatWith 中使用 replayedClicks,其存储的事件将在 initLoading 完成后重播:

initLoading.concatWith(replayedClicks)
        .subscribeBy(
                onNext = { println("result $it") },
                onError = { throw it }
        )