如何使用不同的参数确保结果计数和缓存

How to ensure result count and caching with varying parameters

我有一个 API 端点,它可以根据请求参数计算不同的结果。参数为 pageper_pagequery 等。

fun getItems(params : Map<String, String>) : Single<ItemsResponse>

data class ItemsResponse(
  val hasMore : Boolean,
  val items : List<Items>
)

API 不可信,可能 return 小于 per_page。我想确保我总是得到我需要的结果计数并缓存下一个请求周期的剩余部分。
例如

val page : Int = 1

fun fetchItems(requestedItems : Int = 20) : Single<List<Items>> {
    ...
    .map { buildParams(page, perPage, query) }
    .flatMap { api.getItems(it) }
    .doOnSuccess { page++ }
    .buffer(requestedItems)
}

fun buildParams(page: Int, perPage: Int, query : String) : Map<String, String> { 
   ...
}

示例场景:

这看起来像生产者-消费者模式,但在 RxJava2 中可行吗?

编辑: 根据附加信息

需要:RxJava 2 扩展库:compile "com.github.akarnokd:rxjava2-extensions:0.17.0"

import hu.akarnokd.rxjava2.expr.StatementObservable
import io.reactivex.Observable
import io.reactivex.functions.BooleanSupplier
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ThreadLocalRandom

var counter = 0;

fun service() : Observable<String> {
    return Observable.defer(Callable {
        val n = ThreadLocalRandom.current().nextInt(21)
        val c = counter++;
        Observable.range(1, n).map({ v -> "" + c + " | " + v })
    })
}

fun getPage(pageSignal : Observable<Int>, pageSize: Int) : Observable<List<String>> {
    return Observable.defer(Callable {
        val queue = ConcurrentLinkedQueue<String>()

        pageSignal.concatMap({ _ ->
            StatementObservable.whileDo(
                    service()
                            .toList()
                            .doOnSuccess({ v -> v.forEach { queue.offer(it) }})
                            .toObservable()
                    , BooleanSupplier { queue.size < pageSize })
                    .ignoreElements()
                    .andThen(
                            Observable.range(1, pageSize)
                                    .concatMap({ _ ->
                                        val o = queue.poll();
                                        if (o == null) {
                                            Observable.empty()
                                        } else {
                                            Observable.just(o)
                                        }
                                    })
                                    .toList()
                                    .toObservable()
                    )
        })
    })
}

fun main(args: Array<String>) {

    val pages = PublishSubject.create<Int>();

    getPage(pages, 20)
            .subscribe({ println(it) }, { it.printStackTrace() })

    pages.onNext(1)

    pages.onNext(2)
}