这个 takeWhileInclusive 的实现安全吗?

Is this implementation of takeWhileInclusive safe?

我发现以下实现包含 takeWhile(发现 here

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

问题是我不是 100% 相信如果在 并行序列 上使用它是安全的。

我担心的是我们会依赖 shouldContinue 变量来知道何时停止,但我们没有同步它的访问。

有什么见解吗?

以下是我目前所了解的内容。

问题澄清

问题不清楚。没有 平行序列 这样的东西,我可能把它们和 Java's parallel streams 搞混了。我的意思是 同时使用 .

的序列

序列是同步的

正如@LouisWasserman 在评论中指出的那样,序列不是为并行执行而设计的。特别是 SequenceBuilder 被注释为 @RestrictSuspension。引用自 Kotlin Coroutine 回购:

It means that no SequenceBuilder extension of lambda in its scope can invoke suspendContinuation or other general suspending function

话虽如此,正如@MarkoTopolnik 所评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。

并行使用的序列

作为示例,这是并行使用序列的第一次尝试

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

此代码打印:

[ForkJoinPool.commonPool-worker-2] Processor #1 received 1

[ForkJoinPool.commonPool-worker-1] Processor #0 received 0

[ForkJoinPool.commonPool-worker-3] Processor #2 received 2

[ForkJoinPool.commonPool-worker-2] Processor #3 received 3

[ForkJoinPool.commonPool-worker-1] Processor #4 received 3

[ForkJoinPool.commonPool-worker-3] Processor #5 received 3

[ForkJoinPool.commonPool-worker-1] Processor #7 received 5

[ForkJoinPool.commonPool-worker-2] Processor #6 received 4

[ForkJoinPool.commonPool-worker-1] Processor #9 received 7

[ForkJoinPool.commonPool-worker-3] Processor #8 received 6

这当然不是我们想要的。因为有些数字被消耗了两次。

输入频道

另一方面,如果我们要使用频道,我们可以这样写:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

则输出为:

[ForkJoinPool.commonPool-worker-2] Processor #0 received 1

[ForkJoinPool.commonPool-worker-2] Processor #0 received 2

[ForkJoinPool.commonPool-worker-1] Processor #1 received 3

[ForkJoinPool.commonPool-worker-2] Processor #2 received 4

[ForkJoinPool.commonPool-worker-1] Processor #3 received 5

[ForkJoinPool.commonPool-worker-2] Processor #4 received 6

[ForkJoinPool.commonPool-worker-2] Processor #0 received 7

[ForkJoinPool.commonPool-worker-1] Processor #1 received 8

[ForkJoinPool.commonPool-worker-1] Processor #2 received 9

[ForkJoinPool.commonPool-worker-2] Processor #3 received 10

此外,我们可以像这样为渠道实施 takeWhileInclusive 方法:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

它按预期工作。