运行 while 并行循环

Run while loop in parallel

我有一个大集合(+90 000 个对象),我想 运行 while 在其上并行循环,我的函数源如下

val context = newSingleThreadAsyncContext()
        return KtxAsync.async(context)  {
            val fields = regularMazeService.generateFields(colsNo, rowsNo)

        val time = measureTimeMillis {
            withContext(newAsyncContext(10)) {
                while (availableFieldsWrappers.isNotEmpty()) {

                    val wrapper = getFirstShuffled(availableFieldsWrappers.lastIndex)
                            .let { availableFieldsWrappers[it] }

                    if (wrapper.neighborsIndexes.isEmpty()) {
                        availableFieldsWrappers.remove(wrapper)
                        continue
                    }

                    val nextFieldIndex = getFirstShuffled(wrapper.neighborsIndexes.lastIndex)
                            .let {
                                val fieldIndex = wrapper.neighborsIndexes[it]
                                wrapper.neighborsIndexes.removeAt(it)
                                fieldIndex
                            }

                    if (visitedFieldsIndexes.contains(nextFieldIndex)) {
                        wrapper.neighborsIndexes.remove(nextFieldIndex)
                        fields[nextFieldIndex].neighborFieldsIndexes.remove(wrapper.index)
                        continue
                    }

                    val nextField = fields[nextFieldIndex]
                    availableFieldsWrappers.add(FieldWrapper(nextField, nextFieldIndex))
                    visitedFieldsIndexes.add(nextFieldIndex)

                    wrapper.field.removeNeighborWall(nextFieldIndex)
                    nextField.removeNeighborWall(wrapper.index)
                }
            }
        }
        Gdx.app.log("maze-time", "$time")

在class

之上
private val availableFieldsWrappers = Collections.synchronizedList(mutableListOf<FieldWrapper>())
private val visitedFieldsIndexes = Collections.synchronizedList(mutableListOf<Int>())

我测试了几次结果如下:

我做错了什么?

  1. 您正在使用 Java 标准库中的 Collections.synchronizedList,returns 一个列表包装器利用阻塞 synchronized 机制来确保线程安全。这种机制与协程不兼容,因为它会阻止其他线程访问集合,直到操作完成。从多个协程访问数据时,通常应该使用非阻塞并发集合,或者使用 non-blocking mutex.

  2. 保护共享数据
  3. List.contains 随着添加的元素越来越多,会变得越来越慢(O(n))。您应该使用 visitedFieldsIndexes 的集合而不是列表。只需确保使用互斥锁保护它或使用并发变体即可。类似地,从 availableFieldsWrappers 中删除具有随机索引的值的成本非常高 - 相反,您可以将列表洗牌一次并使用简单的迭代。

  4. 您没有重用协程上下文。通常,您可以创建一次异步上下文并重用其实例,而不是每次需要协程时都创建一个新的线程池。您应该只调用并分配 newAsyncContext(10) 的结果一次,然后在整个应用程序中重复使用它。

  5. 您目前编写的代码并没有很好地利用协程。不要将协程调度程序视为一个线程池,您可以在其中并行启动 N 个大任务(即您的 while availableFieldsWrappers.isNotEmpty 循环),您应该将其视为成百上千个小任务的执行者,并调整您的代码因此。我认为您可以通过引入例如重写代码来完全避免 available/visited 集合Kotlin flows 或仅处理较小部分逻辑的多个 KtxAsync.async/KtxAsync.launch 调用。

  6. 除非某些函数挂起或在底层使用协程,否则您根本就没有真正利用异步上下文的多线程。 withContext(newAsyncContext(10)) 启动一个按顺序处理整个逻辑的协程,只利用一个线程。有关如何重写代码的一些想法,请参见 4.。尝试收集(或只是打印)线程哈希值和名称,看看您是否使用了所有线程。