RxJava:结合冷热可观察以相互等待
RxJava: Combining hot and cold observable to wait for each other
我的观察值是这样定义的
val initLoading = Observable.fromCallable { println("${System.currentTimeMillis()}") }
.subscribeOn(Schedulers.computation())
.delay(WAIT_TIME, TimeUnit.SECONDS)
.map { "loading ${System.currentTimeMillis()}" }
.observeOn(AndroidSchedulers.mainThread())
val click = RxView.clicks(button).map { "click ${System.currentTimeMillis()}" }
initLoading.concatWith(click)
.subscribeBy(
onNext = { println("result $it") },
onError = { throw it }
)
initialLoading
在 Activity 的 onCreate
方法处启动 运行。 click
在按钮单击时执行。我有两个案例,第一个有效,第二个无效。
案例 1
activity 启动并在 WAIT_TIME
秒后单击按钮。输出:
01-23 13:08:07.170 I/System.out: 1516698487170
01-23 13:08:17.174 I/System.out: result loading 1516698497172
01-23 13:08:29.258 I/System.out: result click 1516698509258
案例2
activity 启动并单击按钮 before WAIT_TIME
期间结束。输出
01-23 13:09:07.392 I/System.out: 1516698547392
01-23 13:09:17.398 I/System.out: result loading 1516698557395
所以,问题是点击事件丢失了。我希望点击事件等待加载,然后继续工作。简而言之,案例 2 的输出应该与案例 1 相同。
我如何使用 rx 运算符来切碎它。我试过 merge
但它只是结合了两者并且点击事件不等待加载。
我也试过 reply, cache, publish, share
但无法按照我的意愿将它们正确组合。
尝试 combineLatest 而不是 concatWith。 "result loading" 总是一样的
concatWith
运算符适合您的用例,但第二个可观察对象应在创建后立即开始存储点击事件,以便在订阅可观察对象时可以发出存储的事件(这会发生当 initLoading
完成时)。这可以通过使用 replay()
和 connect()
.
修改您的 click
observable 来实现
val replayedClicks = click.replay();
replayedClicks.connect(); // The original click observable is subscribed to at this point
现在您可以在 concatWith
中使用 replayedClicks
,其存储的事件将在 initLoading
完成后重播:
initLoading.concatWith(replayedClicks)
.subscribeBy(
onNext = { println("result $it") },
onError = { throw it }
)
我的观察值是这样定义的
val initLoading = Observable.fromCallable { println("${System.currentTimeMillis()}") }
.subscribeOn(Schedulers.computation())
.delay(WAIT_TIME, TimeUnit.SECONDS)
.map { "loading ${System.currentTimeMillis()}" }
.observeOn(AndroidSchedulers.mainThread())
val click = RxView.clicks(button).map { "click ${System.currentTimeMillis()}" }
initLoading.concatWith(click)
.subscribeBy(
onNext = { println("result $it") },
onError = { throw it }
)
initialLoading
在 Activity 的 onCreate
方法处启动 运行。 click
在按钮单击时执行。我有两个案例,第一个有效,第二个无效。
案例 1
activity 启动并在 WAIT_TIME
秒后单击按钮。输出:
01-23 13:08:07.170 I/System.out: 1516698487170
01-23 13:08:17.174 I/System.out: result loading 1516698497172
01-23 13:08:29.258 I/System.out: result click 1516698509258
案例2
activity 启动并单击按钮 before WAIT_TIME
期间结束。输出
01-23 13:09:07.392 I/System.out: 1516698547392
01-23 13:09:17.398 I/System.out: result loading 1516698557395
所以,问题是点击事件丢失了。我希望点击事件等待加载,然后继续工作。简而言之,案例 2 的输出应该与案例 1 相同。
我如何使用 rx 运算符来切碎它。我试过 merge
但它只是结合了两者并且点击事件不等待加载。
我也试过 reply, cache, publish, share
但无法按照我的意愿将它们正确组合。
尝试 combineLatest 而不是 concatWith。 "result loading" 总是一样的
concatWith
运算符适合您的用例,但第二个可观察对象应在创建后立即开始存储点击事件,以便在订阅可观察对象时可以发出存储的事件(这会发生当 initLoading
完成时)。这可以通过使用 replay()
和 connect()
.
click
observable 来实现
val replayedClicks = click.replay();
replayedClicks.connect(); // The original click observable is subscribed to at this point
现在您可以在 concatWith
中使用 replayedClicks
,其存储的事件将在 initLoading
完成后重播:
initLoading.concatWith(replayedClicks)
.subscribeBy(
onNext = { println("result $it") },
onError = { throw it }
)