这个 takeWhileInclusive 的实现安全吗?
Is this implementation of takeWhileInclusive safe?
我发现以下实现包含 takeWhile
(发现 here)
fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
var shouldContinue = true
return takeWhile {
val result = shouldContinue
shouldContinue = pred(it)
result
}
}
问题是我不是 100% 相信如果在 并行序列 上使用它是安全的。
我担心的是我们会依赖 shouldContinue
变量来知道何时停止,但我们没有同步它的访问。
有什么见解吗?
以下是我目前所了解的内容。
问题澄清
问题不清楚。没有 平行序列 这样的东西,我可能把它们和 Java's parallel streams 搞混了。我的意思是 同时使用 .
的序列
序列是同步的
正如@LouisWasserman 在评论中指出的那样,序列不是为并行执行而设计的。特别是 SequenceBuilder
被注释为 @RestrictSuspension
。引用自 Kotlin Coroutine 回购:
It means that no SequenceBuilder extension of lambda in its scope can invoke suspendContinuation or other general suspending function
话虽如此,正如@MarkoTopolnik 所评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。
并行使用的序列
作为示例,这是并行使用序列的第一次尝试
fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}
fun main(args: Array<String>) {
val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
runBlocking {
val iterator = s.iterator()
repeat(10) { launchProcessor(it, iterator) }
}
}
此代码打印:
[ForkJoinPool.commonPool-worker-2] Processor #1 received 1
[ForkJoinPool.commonPool-worker-1] Processor #0 received 0
[ForkJoinPool.commonPool-worker-3] Processor #2 received 2
[ForkJoinPool.commonPool-worker-2] Processor #3 received 3
[ForkJoinPool.commonPool-worker-1] Processor #4 received 3
[ForkJoinPool.commonPool-worker-3] Processor #5 received 3
[ForkJoinPool.commonPool-worker-1] Processor #7 received 5
[ForkJoinPool.commonPool-worker-2] Processor #6 received 4
[ForkJoinPool.commonPool-worker-1] Processor #9 received 7
[ForkJoinPool.commonPool-worker-3] Processor #8 received 6
这当然不是我们想要的。因为有些数字被消耗了两次。
输入频道
另一方面,如果我们要使用频道,我们可以这样写:
fun produceNumbers() = produce {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("[${Thread.currentThread().name}] Processor #$id received $it")
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(1000)
producer.cancel() // cancel producer coroutine and thus kill them all
}
则输出为:
[ForkJoinPool.commonPool-worker-2] Processor #0 received 1
[ForkJoinPool.commonPool-worker-2] Processor #0 received 2
[ForkJoinPool.commonPool-worker-1] Processor #1 received 3
[ForkJoinPool.commonPool-worker-2] Processor #2 received 4
[ForkJoinPool.commonPool-worker-1] Processor #3 received 5
[ForkJoinPool.commonPool-worker-2] Processor #4 received 6
[ForkJoinPool.commonPool-worker-2] Processor #0 received 7
[ForkJoinPool.commonPool-worker-1] Processor #1 received 8
[ForkJoinPool.commonPool-worker-1] Processor #2 received 9
[ForkJoinPool.commonPool-worker-2] Processor #3 received 10
此外,我们可以像这样为渠道实施 takeWhileInclusive
方法:
fun <E> ReceiveChannel<E>.takeWhileInclusive(
context: CoroutineContext = Unconfined,
predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
var shouldContinue = true
consumeEach {
val currentShouldContinue = shouldContinue
shouldContinue = predicate(it)
if (!currentShouldContinue) return@produce
send(it)
}
}
它按预期工作。
我发现以下实现包含 takeWhile
(发现 here)
fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
var shouldContinue = true
return takeWhile {
val result = shouldContinue
shouldContinue = pred(it)
result
}
}
问题是我不是 100% 相信如果在 并行序列 上使用它是安全的。
我担心的是我们会依赖 shouldContinue
变量来知道何时停止,但我们没有同步它的访问。
有什么见解吗?
以下是我目前所了解的内容。
问题澄清
问题不清楚。没有 平行序列 这样的东西,我可能把它们和 Java's parallel streams 搞混了。我的意思是 同时使用 .
的序列序列是同步的
正如@LouisWasserman 在评论中指出的那样,序列不是为并行执行而设计的。特别是 SequenceBuilder
被注释为 @RestrictSuspension
。引用自 Kotlin Coroutine 回购:
It means that no SequenceBuilder extension of lambda in its scope can invoke suspendContinuation or other general suspending function
话虽如此,正如@MarkoTopolnik 所评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。
并行使用的序列
作为示例,这是并行使用序列的第一次尝试
fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}
fun main(args: Array<String>) {
val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
runBlocking {
val iterator = s.iterator()
repeat(10) { launchProcessor(it, iterator) }
}
}
此代码打印:
[ForkJoinPool.commonPool-worker-2] Processor #1 received 1
[ForkJoinPool.commonPool-worker-1] Processor #0 received 0
[ForkJoinPool.commonPool-worker-3] Processor #2 received 2
[ForkJoinPool.commonPool-worker-2] Processor #3 received 3
[ForkJoinPool.commonPool-worker-1] Processor #4 received 3
[ForkJoinPool.commonPool-worker-3] Processor #5 received 3
[ForkJoinPool.commonPool-worker-1] Processor #7 received 5
[ForkJoinPool.commonPool-worker-2] Processor #6 received 4
[ForkJoinPool.commonPool-worker-1] Processor #9 received 7
[ForkJoinPool.commonPool-worker-3] Processor #8 received 6
这当然不是我们想要的。因为有些数字被消耗了两次。
输入频道
另一方面,如果我们要使用频道,我们可以这样写:
fun produceNumbers() = produce {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("[${Thread.currentThread().name}] Processor #$id received $it")
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(1000)
producer.cancel() // cancel producer coroutine and thus kill them all
}
则输出为:
[ForkJoinPool.commonPool-worker-2] Processor #0 received 1
[ForkJoinPool.commonPool-worker-2] Processor #0 received 2
[ForkJoinPool.commonPool-worker-1] Processor #1 received 3
[ForkJoinPool.commonPool-worker-2] Processor #2 received 4
[ForkJoinPool.commonPool-worker-1] Processor #3 received 5
[ForkJoinPool.commonPool-worker-2] Processor #4 received 6
[ForkJoinPool.commonPool-worker-2] Processor #0 received 7
[ForkJoinPool.commonPool-worker-1] Processor #1 received 8
[ForkJoinPool.commonPool-worker-1] Processor #2 received 9
[ForkJoinPool.commonPool-worker-2] Processor #3 received 10
此外,我们可以像这样为渠道实施 takeWhileInclusive
方法:
fun <E> ReceiveChannel<E>.takeWhileInclusive(
context: CoroutineContext = Unconfined,
predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
var shouldContinue = true
consumeEach {
val currentShouldContinue = shouldContinue
shouldContinue = predicate(it)
if (!currentShouldContinue) return@produce
send(it)
}
}
它按预期工作。