流进行过滤,直到另一个流发出,但一旦第二个流发出,就重新发出那些过滤后的排放

Stream to filter until another stream emits, but reemit those filtered emissions once 2nd stream emits

我有一个 Foo 流。 Foo 发出要求 Android 视图进行布局(宽度和高度 > 0)。我为此使用 RxBinding 即

fooOservable()
   .subscribe(foo -> {});

RxView.preDraws(mPager, () -> true)
                    .take(1);

当这个 observable 发出(或完成,因为拍摄)视图被布局时。

我需要的是让 fooObservable() 等到 RxView 发出 ,即视图已布局。我不能只是 fooObservable.filter( width && height > 0 ),因为那样会降低发射。 我需要的是缓存 foo 的最后一次发射(如果视图未布局)并在第一个 RxView.preDraws. 之后重新发射,如果它被布局,它应该正常通过

您可以使用 delaydelaySubsription。有些形式会延迟发射或订阅,直到另一个可观察对象发射某些东西。在第一种情况下,您只会有发射延迟,在第二种情况下,整个订阅都会延迟。

我已经编写了一个示例,使用延迟来等待另一个可观察对象,然后再发出您的数据。

class SimpleTest {
  val testScheduler = TestScheduler()

  @Test
  fun test() {
    fooObservable()
        .doOnNext { logger("Next", it.toString()) }
        .delay { Observable.timer(5, TimeUnit.SECONDS, testScheduler) }
        .subscribe { logger("Delayed", it.toString()) }

    testScheduler.advanceTimeBy(10, TimeUnit.SECONDS)
  }

  fun fooObservable(): Observable<Int> {
    return Observable.just(1, 2, 3, 4)
  }

  fun logger(tag: String, message: String): Unit {
    val formattedDate = Date(testScheduler.now()).format()
    System.out.println("$tag @ $formattedDate: $message")
  }

  fun Date.format(): String {
    return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
  }
}

我使用了一个 5 秒计时器来模拟您的辅助可观察对象。

此代码打印输出:

Next @ 21:00:00.000: 1
Next @ 21:00:00.000: 2
Next @ 21:00:00.000: 3
Next @ 21:00:00.000: 4
Delayed @ 21:00:05.000: 1
Delayed @ 21:00:05.000: 2
Delayed @ 21:00:05.000: 3
Delayed @ 21:00:05.000: 4