将通道桥接到序列

Bridge channel to a sequence

此代码基于Coroutines guide example: Fan-out

val inputProducer = produce<String>(CommonPool) {
    (0..inputArray.size).forEach {
        send(inputArray[it])
    }
}

val resultChannel = Channel<Result>(10)

repeat(threadCount) {
    launch(CommonPool) {
        inputProducer.consumeEach {
            resultChannel.send(getResultFromData(it))
        }
    }
}

创建能够提供结果的 Sequence<Result> 的正确方法是什么?

您可以获取通道 .iterator() from the ReceiveChannel,然后将该通道迭代器包装到 Sequence<T> 中,实现其正常的 Iterator<T> 阻塞等待每个请求的结果:

fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) =
    Sequence {
        val iterator = iterator()
        object : AbstractIterator<T>() {
            override fun computeNext() = runBlocking(context) {
                if (!iterator.hasNext())
                    done() else
                    setNext(iterator.next())
            }
        }
    }

val resultSequence = resultChannel.asSequence(CommonPool)

我遇到了同样的问题,最后我想出了这个 unusual/convoluted 解决方案:

fun Channel<T>.asSequence() : Sequence<T> {
    val itr = this.iterator()
    return sequence<Int> {
      while ( runBlocking {itr.hasNext()} ) yield( runBlocking<T> { itr.next() } )
    }
}

我认为它不是特别有效(使用@hotkey 提供的那个),但它至少对我有一定的吸引力。