如何使用 RxJava 对来自异步源的数据进行分组

How to group data from async sources using RxJava

我正在开发一个交易应用程序。当用户 select 一些产品时,我需要为每个产品显示市场是否开放及其最新价格。用户可以 select 例如 2 个产品,而我要做的是仅当我拥有 2 个产品的所有信息时才显示数据。数据可以随时更改(即其中一种产品的市场关闭)。这是我的代码:

data class ProductInfo(
    val productCode: String,
    val isMarketOpen: Boolean,
    val latestPrice: BigDecimal,
)

// This observable could emit at any time due to user interaction with the UI
private fun productsCodeObservable(): Observable<List<String>> = Observable.just(listOf("ProductA", "ProductB"))

// Markets have different working hours
private fun isMarketOpenObservable(productCode: String): Observable<Boolean> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map {
                // TODO: Use API to determine if the market is open for productCode
                it.toInt() % 2 == 0
            }
}

// The product price fluctuates so this emits every X seconds
private fun latestPriceObservable(productCode: String): Observable<BigDecimal> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map { productPrice -> productPrice.toBigDecimal() }
}

@Test
fun `test`() {

    val countDownLatch = CountDownLatch(1)

    productsCodeObservable()
            .switchMap { Observable.fromIterable(it) }
            .flatMap { productCode ->
                Observable.combineLatest(
                        isMarketOpenObservable(productCode),
                        latestPriceObservable(productCode)
                ) { isMarketOpen, latestPrice ->
                    ProductInfo(productCode, isMarketOpen, latestPrice)
                }
            }
            .toList()
            .doOnSuccess { productsInfo ->
                println(productsInfo)
            }
            .subscribe()

    countDownLatch.await()
}

我不知道是什么问题,因为测试方法从不打印任何东西。我对 RxJava 了解不多,但我的理解是 toList 不起作用,因为源可观察对象永远不会完成。关于如何收集产品代码的数据并在任何数据更改时发出列表的任何想法? :)

使用 RxJava 的 Emitter 接口并实现其方法:

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

我认为您需要一个 ObservableEmitter,请查看以下页面: https://www.raywenderlich.com/2071847-reactive-programming-with-rxandroid-in-kotlin-an-introduction

如果您想在每次这些产品发生变化时收到新的产品信息列表:

productsCodeObservable()
    .switchMap { list ->
        val productInfoObservables = list.map { productCode ->
            Observable.combineLatest(
                isMarketOpenObservable(productCode),
                latestPriceObservable(productCode)
            ) { isMarketOpen, latestPrice ->
                ProductInfo(productCode, isMarketOpen, latestPrice)
            }
        }
        Observable.combineLatest(productInfoObservables) { it.toList() as List<ProductInfo> }
    }
    .doOnNext { productsInfoList ->
        println(productsInfoList)
    }
    .subscribe()