运行 while 并行循环
Run while loop in parallel
我有一个大集合(+90 000 个对象),我想 运行 while 在其上并行循环,我的函数源如下
val context = newSingleThreadAsyncContext()
return KtxAsync.async(context) {
val fields = regularMazeService.generateFields(colsNo, rowsNo)
val time = measureTimeMillis {
withContext(newAsyncContext(10)) {
while (availableFieldsWrappers.isNotEmpty()) {
val wrapper = getFirstShuffled(availableFieldsWrappers.lastIndex)
.let { availableFieldsWrappers[it] }
if (wrapper.neighborsIndexes.isEmpty()) {
availableFieldsWrappers.remove(wrapper)
continue
}
val nextFieldIndex = getFirstShuffled(wrapper.neighborsIndexes.lastIndex)
.let {
val fieldIndex = wrapper.neighborsIndexes[it]
wrapper.neighborsIndexes.removeAt(it)
fieldIndex
}
if (visitedFieldsIndexes.contains(nextFieldIndex)) {
wrapper.neighborsIndexes.remove(nextFieldIndex)
fields[nextFieldIndex].neighborFieldsIndexes.remove(wrapper.index)
continue
}
val nextField = fields[nextFieldIndex]
availableFieldsWrappers.add(FieldWrapper(nextField, nextFieldIndex))
visitedFieldsIndexes.add(nextFieldIndex)
wrapper.field.removeNeighborWall(nextFieldIndex)
nextField.removeNeighborWall(wrapper.index)
}
}
}
Gdx.app.log("maze-time", "$time")
在class
之上
private val availableFieldsWrappers = Collections.synchronizedList(mutableListOf<FieldWrapper>())
private val visitedFieldsIndexes = Collections.synchronizedList(mutableListOf<Int>())
我测试了几次结果如下:
- 1 个线程 - 21213 毫秒
- 5 个线程 - 27894 毫秒
- 10 个线程 - 21494 毫秒
- 15 个线程 - 20986 毫秒
我做错了什么?
您正在使用 Java 标准库中的 Collections.synchronizedList
,returns 一个列表包装器利用阻塞 synchronized
机制来确保线程安全。这种机制与协程不兼容,因为它会阻止其他线程访问集合,直到操作完成。从多个协程访问数据时,通常应该使用非阻塞并发集合,或者使用 non-blocking mutex.
保护共享数据
List.contains
随着添加的元素越来越多,会变得越来越慢(O(n))。您应该使用 visitedFieldsIndexes
的集合而不是列表。只需确保使用互斥锁保护它或使用并发变体即可。类似地,从 availableFieldsWrappers
中删除具有随机索引的值的成本非常高 - 相反,您可以将列表洗牌一次并使用简单的迭代。
您没有重用协程上下文。通常,您可以创建一次异步上下文并重用其实例,而不是每次需要协程时都创建一个新的线程池。您应该只调用并分配 newAsyncContext(10)
的结果一次,然后在整个应用程序中重复使用它。
您目前编写的代码并没有很好地利用协程。不要将协程调度程序视为一个线程池,您可以在其中并行启动 N 个大任务(即您的 while availableFieldsWrappers.isNotEmpty
循环),您应该将其视为成百上千个小任务的执行者,并调整您的代码因此。我认为您可以通过引入例如重写代码来完全避免 available/visited 集合Kotlin flows 或仅处理较小部分逻辑的多个 KtxAsync.async
/KtxAsync.launch
调用。
除非某些函数挂起或在底层使用协程,否则您根本就没有真正利用异步上下文的多线程。 withContext(newAsyncContext(10))
启动一个按顺序处理整个逻辑的协程,只利用一个线程。有关如何重写代码的一些想法,请参见 4.。尝试收集(或只是打印)线程哈希值和名称,看看您是否使用了所有线程。
我有一个大集合(+90 000 个对象),我想 运行 while 在其上并行循环,我的函数源如下
val context = newSingleThreadAsyncContext()
return KtxAsync.async(context) {
val fields = regularMazeService.generateFields(colsNo, rowsNo)
val time = measureTimeMillis {
withContext(newAsyncContext(10)) {
while (availableFieldsWrappers.isNotEmpty()) {
val wrapper = getFirstShuffled(availableFieldsWrappers.lastIndex)
.let { availableFieldsWrappers[it] }
if (wrapper.neighborsIndexes.isEmpty()) {
availableFieldsWrappers.remove(wrapper)
continue
}
val nextFieldIndex = getFirstShuffled(wrapper.neighborsIndexes.lastIndex)
.let {
val fieldIndex = wrapper.neighborsIndexes[it]
wrapper.neighborsIndexes.removeAt(it)
fieldIndex
}
if (visitedFieldsIndexes.contains(nextFieldIndex)) {
wrapper.neighborsIndexes.remove(nextFieldIndex)
fields[nextFieldIndex].neighborFieldsIndexes.remove(wrapper.index)
continue
}
val nextField = fields[nextFieldIndex]
availableFieldsWrappers.add(FieldWrapper(nextField, nextFieldIndex))
visitedFieldsIndexes.add(nextFieldIndex)
wrapper.field.removeNeighborWall(nextFieldIndex)
nextField.removeNeighborWall(wrapper.index)
}
}
}
Gdx.app.log("maze-time", "$time")
在class
之上private val availableFieldsWrappers = Collections.synchronizedList(mutableListOf<FieldWrapper>())
private val visitedFieldsIndexes = Collections.synchronizedList(mutableListOf<Int>())
我测试了几次结果如下:
- 1 个线程 - 21213 毫秒
- 5 个线程 - 27894 毫秒
- 10 个线程 - 21494 毫秒
- 15 个线程 - 20986 毫秒
我做错了什么?
您正在使用 Java 标准库中的
Collections.synchronizedList
,returns 一个列表包装器利用阻塞synchronized
机制来确保线程安全。这种机制与协程不兼容,因为它会阻止其他线程访问集合,直到操作完成。从多个协程访问数据时,通常应该使用非阻塞并发集合,或者使用 non-blocking mutex. 保护共享数据
List.contains
随着添加的元素越来越多,会变得越来越慢(O(n))。您应该使用visitedFieldsIndexes
的集合而不是列表。只需确保使用互斥锁保护它或使用并发变体即可。类似地,从availableFieldsWrappers
中删除具有随机索引的值的成本非常高 - 相反,您可以将列表洗牌一次并使用简单的迭代。您没有重用协程上下文。通常,您可以创建一次异步上下文并重用其实例,而不是每次需要协程时都创建一个新的线程池。您应该只调用并分配
newAsyncContext(10)
的结果一次,然后在整个应用程序中重复使用它。您目前编写的代码并没有很好地利用协程。不要将协程调度程序视为一个线程池,您可以在其中并行启动 N 个大任务(即您的
while availableFieldsWrappers.isNotEmpty
循环),您应该将其视为成百上千个小任务的执行者,并调整您的代码因此。我认为您可以通过引入例如重写代码来完全避免 available/visited 集合Kotlin flows 或仅处理较小部分逻辑的多个KtxAsync.async
/KtxAsync.launch
调用。除非某些函数挂起或在底层使用协程,否则您根本就没有真正利用异步上下文的多线程。
withContext(newAsyncContext(10))
启动一个按顺序处理整个逻辑的协程,只利用一个线程。有关如何重写代码的一些想法,请参见 4.。尝试收集(或只是打印)线程哈希值和名称,看看您是否使用了所有线程。