你如何创建一个在 Rx 中的列表上工作的计时器?
How can you create a timer that works on a List in Rx?
我想在完成之前查找要找到的完整项目列表,如果未找到整个列表,则会抛出异常(超时或自定义异常)。与内置的 Observable.timer() 一样,但我希望它要求找到列表中的所有项目,而不是在发出第一个项目后通过测试。
这是一个例子。假设我有一些发射 Observable 的测试函数。它看起来像这样:
var emittedList: List<String?> = listOf(null, "202", "302", "400")
data class FoundNumber(val numberId: String?)
fun scanNumbers(): Observable<FoundNumber> = Observable
.intervalRange(0,
emittedList.size.toLong(),
0,
1,
TimeUnit.SECONDS).map { index ->
FoundNumber(emittedList[index.toInt()]) }
然后将调用该函数以获取将与预期数字列表进行比较的数字。如果有来自 scanForNumbers 的额外号码不在 "target" 列表中,这无关紧要。他们只会被忽略。像这样:
val expectedNumbers = listOf("202", "302","999")
scanForNumbers(expectedNumbers)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe { value -> Log.d(TAG, "Was returned a $value") }
因此,预期数字(202、302 和 999)与将发出的数字(202、302 和 400)不完全匹配。因此,超时应该发生,但是使用 Observable.timer() 的内置版本,它不会超时,因为至少观察到一个项目。
这是我想要的。任何人都知道如何在 RxJava/RxKotlin?
中编写代码
fun scanForNumbers(targets: List<String>): Observable<FoundNumber> {
val accumulator: Pair<Set<Any>, FoundNumber?> = targets.toSet() to null
return scanNumbers()
.SPECIAL_TIMEOUT_FOR_LIST(5, TimeUnit.SECONDS, List)
.scan(accumulator) { acc, next ->
val (set, previous) = acc
val stringSet:MutableSet<String> = hashSetOf()
set.forEach { stringSet.add(it.toString()) }
val item = if (next.numberId in stringSet) {
next
} else null
(set - next) to item // return set and nullable item
}
.filter { Log.d(TAG, "Filtering on ${it.second}")
it.second != null } // item not null
.take(targets.size.toLong()) // limit to the number of items
.map { it.second } // unwrap the item from the pair
.map { FoundController(it.numberId) } // wrap in your class
}
你如何编码,希望使用 RxJava/Kotlin,一种在列表上超时的方法?
我想我现在明白了,您希望超时从您订阅的那一刻开始计算,而不是在您观察项目之后。
如果这是您的需要,那么 takeUntil
运算符可以帮助您:
return scanNumbers()
.takeUntil(Observable.timer(5, TimeUnit.SECONDS))
.scan(accumulator) { acc, next -> ...
在这种情况下,计时器将在您订阅后立即开始计时。如果 main observable 在此之前完成那么很好,如果没有,那么计时器无论如何都会完成 main observable。
但是 takeUntil
本身不会抛出错误,它只会完成。如果您需要它以错误结束,那么您可以使用以下组合:
return scanNumbers()
.takeUntil(
Observable
.error<Void>(new TimeoutError("timeout!"))
.delay(5, TimeUnit.SECONDS, true))
.scan(accumulator) { acc, next -> ...
我想在完成之前查找要找到的完整项目列表,如果未找到整个列表,则会抛出异常(超时或自定义异常)。与内置的 Observable.timer() 一样,但我希望它要求找到列表中的所有项目,而不是在发出第一个项目后通过测试。
这是一个例子。假设我有一些发射 Observable
var emittedList: List<String?> = listOf(null, "202", "302", "400")
data class FoundNumber(val numberId: String?)
fun scanNumbers(): Observable<FoundNumber> = Observable
.intervalRange(0,
emittedList.size.toLong(),
0,
1,
TimeUnit.SECONDS).map { index ->
FoundNumber(emittedList[index.toInt()]) }
然后将调用该函数以获取将与预期数字列表进行比较的数字。如果有来自 scanForNumbers 的额外号码不在 "target" 列表中,这无关紧要。他们只会被忽略。像这样:
val expectedNumbers = listOf("202", "302","999")
scanForNumbers(expectedNumbers)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe { value -> Log.d(TAG, "Was returned a $value") }
因此,预期数字(202、302 和 999)与将发出的数字(202、302 和 400)不完全匹配。因此,超时应该发生,但是使用 Observable.timer() 的内置版本,它不会超时,因为至少观察到一个项目。
这是我想要的。任何人都知道如何在 RxJava/RxKotlin?
中编写代码fun scanForNumbers(targets: List<String>): Observable<FoundNumber> {
val accumulator: Pair<Set<Any>, FoundNumber?> = targets.toSet() to null
return scanNumbers()
.SPECIAL_TIMEOUT_FOR_LIST(5, TimeUnit.SECONDS, List)
.scan(accumulator) { acc, next ->
val (set, previous) = acc
val stringSet:MutableSet<String> = hashSetOf()
set.forEach { stringSet.add(it.toString()) }
val item = if (next.numberId in stringSet) {
next
} else null
(set - next) to item // return set and nullable item
}
.filter { Log.d(TAG, "Filtering on ${it.second}")
it.second != null } // item not null
.take(targets.size.toLong()) // limit to the number of items
.map { it.second } // unwrap the item from the pair
.map { FoundController(it.numberId) } // wrap in your class
}
你如何编码,希望使用 RxJava/Kotlin,一种在列表上超时的方法?
我想我现在明白了,您希望超时从您订阅的那一刻开始计算,而不是在您观察项目之后。
如果这是您的需要,那么 takeUntil
运算符可以帮助您:
return scanNumbers()
.takeUntil(Observable.timer(5, TimeUnit.SECONDS))
.scan(accumulator) { acc, next -> ...
在这种情况下,计时器将在您订阅后立即开始计时。如果 main observable 在此之前完成那么很好,如果没有,那么计时器无论如何都会完成 main observable。
但是 takeUntil
本身不会抛出错误,它只会完成。如果您需要它以错误结束,那么您可以使用以下组合:
return scanNumbers()
.takeUntil(
Observable
.error<Void>(new TimeoutError("timeout!"))
.delay(5, TimeUnit.SECONDS, true))
.scan(accumulator) { acc, next -> ...