按照其元素的可用性顺序组合可观察对象

Combine observables in the order of availability of its elements

我正在尝试 uild 像 s.startWith(x) 这样的运算符,但是有条件的运算符 - 让我们称之为 s.startWithIfNothingAvailable(x)。我希望它仅在订阅时 s 没有可用元素 时才使用 x 作为前缀。

让我用一个例子来说明这个想法。

s 是来自服务器的报告流。

我认为解决这个问题的另一种方法是使用类似 .concat 的方法,但它根据元素的可用性对可观察对象进行排序。

Observable.concatFirstAvailable(serverReport, emptyReport),如果 serverReport 还没有元素 - 切换到 emptyReport,然后返回等待 serverReport

如果我正确理解了你的要求,那么你可以使用 startWith() and then sample()

例如采样时间为 50ms。如果在前 50 毫秒内没有报告到达,则将使用 startWith 元素并呈现空状态。否则,将使用最新报告。

Sample 还将确保您不会尝试过于频繁地更新 UI:例如当服务器在 50 毫秒内发送 2 个报告时,您不想同时呈现两个报告,只呈现最新的一个。

您可以合并延迟的特殊报告项目:

// imitate infinite hot service
PublishSubject<Report> service = PublishSubject.create();

// special report indicating the service has no reports
Report NO_REPORT = new Report();

AtomicBoolean hasValue = new AtomicBoolean();

service
// we'll need the main value for both emission and control message
.publish(main ->
     // this will keep "listening" to main and allow a timeout as well
     main.mergeWith(
         // signal the empty report indicator
         Observable.just(NO_REPORT)
         // after some grace period so main can emit a real report
         .delay(100, TimeUnit.MILLISECONDS)
         // but if the main emits first, don't signal the empty report
         .takeUntil(main)
     )
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(report -> {
     if (report == NO_REPORT) {
         // even if this onNext is serialized, NO_REPORT may get emitted
         if (!hasValue.get()) {
             // display empty report
         }
     } else {
         // this indicates a NO_REPORT should be ignored onward
         hasValue.set(true);
         // display normal report
     }
}, error -> {  /* show error */ })

Thread.sleep(200); // Thread.sleep(50)
service.onNext(new Report());