在 Kotlin Native 中,如何在单独的线程中保留一个对象,并在不使用 C 指针的情况下从任何其他 thead 改变它的状态?

In Kotlin Native, how to keep an object around in a separate thread, and mutate its state from any other thead without using C pointers?

我正在探索 Kotlin Native 并且有一个程序包含一堆 Workers 做并发的事情 (运行 关于 Windows,但这是一个一般性问题)。

现在,我想添加简单的日志记录。一个组件,它通过将字符串作为新行附加到 在 'append' 模式下保持打开 的文件来简单地记录字符串。

(理想情况下,我只需要一个 "global" 函数...

fun log(text:String) {...} ] 

...我可以从任何地方打电话,包括从 "inside" 其他工作人员那里打电话,这样就可以了。这里的含义是,由于 Kotlin Native 关于在线程之间传递对象的规则(TLDR:你不应该四处传递可变对象。请参阅:https://github.com/JetBrains/kotlin-native/blob/master/CONCURRENCY.md#object-transfer-and-freezing),这样做并非易事。 此外,我的日志函数理想情况下会接受任何 frozen 对象。 )


我想出的解决方案是使用 DetachedObjectGraph:

首先,我创建一个分离的记录器对象

val loggerGraph = DetachedObjectGraph { FileLogger("/foo/mylogfile.txt")}

然后使用 loggerGraph.asCPointer() ( asCPointer() ) 得到分离图的 COpaquePointer:

val myPointer = loggerGraph.asCPointer()

现在我可以将这个指针传递给工人(通过 producer lambda of the Worker's execute function ), and use it there. Or I can store the pointer in a @ThreadLocal 全局变量


对于写入文件的代码,每当我想记录一行时,我必须再次从指针创建一个 DetachedObjectGraph 对象, attach() 它是为了获得对我的 fileLogger 对象的引用:

val fileLogger = DetachedObjectGraph(myPointer).attach()

现在我可以在记录器上调用记录函数了:

fileLogger.log("My log message")

这是我在查看可用于 Kotlin Native 并发的 API(从 Kotlin 1.3.61 开始)时想到的, 但是 我想知道更好的方法是什么(使用 Kotlin,而不是求助于 C)。显然,为写入的每一行都创建一个 DetachedObjectGraph 对象是不好的。


可以用更一般的方式提出这个问题:如何在单独的线程(或工作线程)中保持可变资源打开,并向其发送消息。

旁注:Coroutines 真正使用线程可以解决这个问题,但问题是如何使用当前可用的 API (Kotlin 1.3.61) 解决这个任务。

您采用的方法几乎是正确的,也是应该采用的方法。

我要补充的是,而不是四处传递指针。您应该传递一个冻结的 FileLogger,它将在内部保存对 AtomicRef<DetachedObjectGraph> 的引用,附加和分离应该在内部完成。特别是因为 DetachedObjectGraphs 一旦附加就无效了。

您绝对不应该以问题中提供的方式使用 DetachedObjectGraph。没有什么可以阻止您尝试附加到多个线程,或者如果您传递相同的指针,则尝试附加到附加到它的另一个线程后的无效线程。

正如 Dominic 提到的,您可以将 DetachedObjectGraph 保留在 AtomicReference 中。但是,如果您要将 DetachedObjectGraph 保留在 AtomicReference 中,请确保类型为 AtomicRef<DetachedObjectGraph?> 且处于忙循环状态,而 DetachedObjectGraph 为空。这将防止相同的 DetachedObjectGraph 被多个线程使用。确保将其设置为 null,并以原子方式重新填充它。

但是,FileLogger 是否需要可变?如果您正在写入文件,则似乎并非如此。即使是这样,我也会将可变对象隔离到一个单独的工作程序并向它发送日志消息,而不是在 AtomicRef 中执行 DetachedObjectGraph

根据我的经验,DetachedObjectGraph 在生产代码中非常不常见。我们目前不在任何地方使用它。

要将可变状态隔离到 Worker,像这样:


class MutableThing<T:Any>(private val worker:Worker = Worker.start(), producer:()->T){
    private val arStable = AtomicReference<StableRef<T>?>(null)
    init {
        worker.execute(TransferMode.SAFE, {Pair(arStable, producer).freeze()}){
            it.first.value = StableRef.create(it.second()).freeze()
        }
    }
    fun <R> access(block:(T)->R):R{
        return worker.execute(TransferMode.SAFE, {Pair(arStable, block).freeze()}){
            it.second(it.first.value!!.get())
        }.result
    }
}

object Log{
    private val fileLogger = MutableThing { FileLogger() }

    fun log(s:String){
        fileLogger.access { fl -> fl.log(s) }
    }
}

class FileLogger{
    fun log(s:String){}
}

MutableThing 在内部使用 StableRefproducer 生成您想要隔离的可变状态。要记录某些内容,请调用 Log.log,这将最终调用可变的 FileLogger.

看一个基本的例子MutableThing,运行下面测试一下:

@Test
fun goIso(){
    val mt = MutableThing { mutableListOf("a", "b")}
    val workers = Array(4){Worker.start()}
    val futures = mutableListOf<Future<*>>()
    repeat(1000) { rcount ->
        val future = workers[rcount % workers.size].execute(
            TransferMode.SAFE,
            { Pair(mt, rcount).freeze() }
        ) { pair ->
            pair.first.access {
                val element = "ttt ${pair.second}"
                println(element)
                it.add(element)
            }
        }
        futures.add(future)
    }

    futures.forEach { it.result }

    workers.forEach { it.requestTermination() }

    mt.access {
        println("size: ${it.size}")
    }
}