在 Kotlin Flow 中使用非同步可变状态是否安全?

Is it safe to use unsynchronized mutable state with Kotlin Flow?

以下代码安全吗?为什么?

val flow: Flow<String> = ...
val allStrings = mutableListOf<String>()
var sum = 0

flow.transform {
  allStrings += it
  emit(it.toInt())
}.collect {
  sum += it
}

以下测试演示了 collect {} 从不同线程调用主体:

val ctx = newFixedThreadPoolContext(32, "my-context")

runBlocking(ctx) {
    val f = flow<Int> {
        (1 .. 1000).forEach {
            emit(it)
        }
    }
    
    var t: Thread? = null
    f.collect {
        delay(1)
        // this requirement will fail
        if (t == null) t = Thread.currentThread() else require(t == Thread.currentThread())
    }
}

还有一个测试发布的:

fun main(args: Array<String>) {
    val ctx = newFixedThreadPoolContext(32, "my-context")
    runBlocking(ctx) {
        val f = flow<Int> {
            (1 .. 1000000).forEach {
                emit(it)
            }
        }
        var c = 0
        f.transform {
            c += 1
            boo()
            c += 1
            emit(it)
            c += 1
        }.collect {
            c += 1
            boo()
            c += 1
        }
        println(c) // prints 5_000_000
    }
}
suspend fun boo() {
    withContext(Dispatchers.IO) {
    }
}

因此,似乎 kotlin 流确保了协程调用之间的发布,但这是有意的(甚至记录在案的)还是实现的副作用?

是的,代码是安全的。默认 Flows are sequential

Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator (collect). No new coroutines are launched by default. Each emitted value is processed by all the intermediate operators from upstream to downstream and is then delivered to the terminal operator after.

Kotlin Flows 基于挂起函数,它们是完全顺序的,但不是 single-threaded。

Therefore it seems kotlin flow ensures publication between coroutine invocations but is this intentional (or even documented) or implementation side effect?

Here it says:

a flow is a type that can emit multiple values sequentially

据我所知,无论涉及多少线程,在处理之前的值之前都不会收集新值。

正如 Roman 在他的评论中提到的,这里有一个关于 顺序执行 good article。那里有一段很棒的引述:

Even though a coroutine in Kotlin can execute on multiple threads it is just like a thread from a standpoint of mutable state. No two actions in the same coroutine can be concurrent. And just like with threads you should avoid sharing your mutable state between coroutines or you’ll have to worry about synchronization yourself. Avoid sharing mutable state. Confine each mutable object to either a single thread or to a single coroutine and sleep well.

这适用于 Flows,因为 Flow 的集合直接在调用终端操作员的协程中工作。默认情况下不启动新协程。