如何使用不同的参数确保结果计数和缓存
How to ensure result count and caching with varying parameters
我有一个 API 端点,它可以根据请求参数计算不同的结果。参数为 page
、per_page
、query
等。
fun getItems(params : Map<String, String>) : Single<ItemsResponse>
data class ItemsResponse(
val hasMore : Boolean,
val items : List<Items>
)
API 不可信,可能 return 小于 per_page
。我想确保我总是得到我需要的结果计数并缓存下一个请求周期的剩余部分。
例如
val page : Int = 1
fun fetchItems(requestedItems : Int = 20) : Single<List<Items>> {
...
.map { buildParams(page, perPage, query) }
.flatMap { api.getItems(it) }
.doOnSuccess { page++ }
.buffer(requestedItems)
}
fun buildParams(page: Int, perPage: Int, query : String) : Map<String, String> {
...
}
示例场景:
- 来电者第一次请求 20 个项目。
- 用
page: 1
调用api.getItems()
,per_page
总是20。
- 调用 returns 16 项
- 用
page: 2
呼叫 api.getItems()
- 调用return 19 项
- 20 项已 return 发送给调用者,其余 15 项已缓存以供下一次调用者请求。
- 来电者第二次请求 20 个项目。
- 用
page: 3
呼叫 api.getItems()
- 调用 returns 12 项
- 20 个项目被 return 发送给调用者(15 个旧项目和 5 个来自上次响应),其余 7 个项目被缓存用于下一个调用者请求。
- 依此类推。
这看起来像生产者-消费者模式,但在 RxJava2
中可行吗?
编辑: 根据附加信息
需要:RxJava 2 扩展库:compile "com.github.akarnokd:rxjava2-extensions:0.17.0"
import hu.akarnokd.rxjava2.expr.StatementObservable
import io.reactivex.Observable
import io.reactivex.functions.BooleanSupplier
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ThreadLocalRandom
var counter = 0;
fun service() : Observable<String> {
return Observable.defer(Callable {
val n = ThreadLocalRandom.current().nextInt(21)
val c = counter++;
Observable.range(1, n).map({ v -> "" + c + " | " + v })
})
}
fun getPage(pageSignal : Observable<Int>, pageSize: Int) : Observable<List<String>> {
return Observable.defer(Callable {
val queue = ConcurrentLinkedQueue<String>()
pageSignal.concatMap({ _ ->
StatementObservable.whileDo(
service()
.toList()
.doOnSuccess({ v -> v.forEach { queue.offer(it) }})
.toObservable()
, BooleanSupplier { queue.size < pageSize })
.ignoreElements()
.andThen(
Observable.range(1, pageSize)
.concatMap({ _ ->
val o = queue.poll();
if (o == null) {
Observable.empty()
} else {
Observable.just(o)
}
})
.toList()
.toObservable()
)
})
})
}
fun main(args: Array<String>) {
val pages = PublishSubject.create<Int>();
getPage(pages, 20)
.subscribe({ println(it) }, { it.printStackTrace() })
pages.onNext(1)
pages.onNext(2)
}
我有一个 API 端点,它可以根据请求参数计算不同的结果。参数为 page
、per_page
、query
等。
fun getItems(params : Map<String, String>) : Single<ItemsResponse>
data class ItemsResponse(
val hasMore : Boolean,
val items : List<Items>
)
API 不可信,可能 return 小于 per_page
。我想确保我总是得到我需要的结果计数并缓存下一个请求周期的剩余部分。
例如
val page : Int = 1
fun fetchItems(requestedItems : Int = 20) : Single<List<Items>> {
...
.map { buildParams(page, perPage, query) }
.flatMap { api.getItems(it) }
.doOnSuccess { page++ }
.buffer(requestedItems)
}
fun buildParams(page: Int, perPage: Int, query : String) : Map<String, String> {
...
}
示例场景:
- 来电者第一次请求 20 个项目。
- 用
page: 1
调用api.getItems()
,per_page
总是20。- 调用 returns 16 项
- 用
page: 2
呼叫api.getItems()
- 调用return 19 项
- 20 项已 return 发送给调用者,其余 15 项已缓存以供下一次调用者请求。
- 来电者第二次请求 20 个项目。
- 用
page: 3
呼叫api.getItems()
- 调用 returns 12 项
- 20 个项目被 return 发送给调用者(15 个旧项目和 5 个来自上次响应),其余 7 个项目被缓存用于下一个调用者请求。
- 依此类推。
这看起来像生产者-消费者模式,但在 RxJava2
中可行吗?
编辑: 根据附加信息
需要:RxJava 2 扩展库:compile "com.github.akarnokd:rxjava2-extensions:0.17.0"
import hu.akarnokd.rxjava2.expr.StatementObservable
import io.reactivex.Observable
import io.reactivex.functions.BooleanSupplier
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ThreadLocalRandom
var counter = 0;
fun service() : Observable<String> {
return Observable.defer(Callable {
val n = ThreadLocalRandom.current().nextInt(21)
val c = counter++;
Observable.range(1, n).map({ v -> "" + c + " | " + v })
})
}
fun getPage(pageSignal : Observable<Int>, pageSize: Int) : Observable<List<String>> {
return Observable.defer(Callable {
val queue = ConcurrentLinkedQueue<String>()
pageSignal.concatMap({ _ ->
StatementObservable.whileDo(
service()
.toList()
.doOnSuccess({ v -> v.forEach { queue.offer(it) }})
.toObservable()
, BooleanSupplier { queue.size < pageSize })
.ignoreElements()
.andThen(
Observable.range(1, pageSize)
.concatMap({ _ ->
val o = queue.poll();
if (o == null) {
Observable.empty()
} else {
Observable.just(o)
}
})
.toList()
.toObservable()
)
})
})
}
fun main(args: Array<String>) {
val pages = PublishSubject.create<Int>();
getPage(pages, 20)
.subscribe({ println(it) }, { it.printStackTrace() })
pages.onNext(1)
pages.onNext(2)
}