使用协程读取和复制文件

Read and copy file with Coroutines

我创建了以下应用程序来说明一些疑问。 My Example on the Github

在这个例子中,我将一个文件复制到另一个包中。

我的疑惑如下:

  1. 并行执行任务,是否可以return取消前完成的值?

  2. 为什么在 contentResolver.openInputStream (uri) 中出现消息“不适当的阻塞方法调用”,而我正在使用 IO 上下文?

  3. 当我读取要复制到输出的文件条目时,我总是检查作业状态,以便在取消此任务时立即停止,删除创建的输出文件并return取消异常,对吗?

  4. 我可以限定执行的任务量吗?

我的 onCreate:

private val listUri = mutableListOf<Uri>()
private val job = Job()

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    
    //get files from 1 to 40
    val packageName = "android.resource://${packageName}/raw/"
    for (i in 1..40) {
        listUri.add(Uri.parse("${packageName}file$i"))
    }
}

我的按钮操作:

  //Button action
   fun onClickStartTask(view: View) {
        var listNewPath = emptyList<String>()
        CoroutineScope(Main + job).launch {
            try {
                //shows something in the UI - progressBar
                withContext(IO) {
                    listNewPath = listUri.map { uri ->
                        async {
                            //path to file temp
                            val pathFileTemp =
                                "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                            val file = File(pathFileTemp)
                            val inputStream = contentResolver.openInputStream(uri)
                            inputStream?.use { input ->
                                FileOutputStream(file).use { output ->
                                    val buffer = ByteArray(1024)
                                    var read: Int = input.read(buffer)
                                    while (read != -1) {
                                        if (isActive) {
                                            output.write(buffer, 0, read)
                                            read = input.read(buffer)
                                        } else {
                                            input.close()
                                            output.close()
                                            file.deleteRecursively()
                                            throw CancellationException()
                                        }
                                    }
                                }
                            }
                            //If completed then it returns the new path.
                            return@async pathFileTemp
                        }
                    }.awaitAll()
                }
            } finally {
                //shows list complete in the UI
            }
        }
    }

我的取消作业按钮:

fun onClickCancelTask(view: View) {
    if (job.isActive) {
        job.cancelChildren()
        println("Cancel children")
    }
}

这将是执行任务的按钮操作。

谢谢大家的帮助。

我认为这是更好的方法

fun onClickStartTask(view: View) {
    var listNewPath = emptyList<String>()
    val copiedFiles = mutableListOf<File>()
    CoroutineScope(Dispatchers.Main + job).launch {
        try {
            //shows something in the UI - progressBar
            withContext(Dispatchers.IO) {
                listNewPath = listUri.map { uri ->
                    async {
                        //path to file temp
                        val pathFileTemp =
                                "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                        val file = File(pathFileTemp)
                        val inputStream = contentResolver.openInputStream(uri)
                        inputStream?.use { input ->
                            file.outputStream().use { output ->
                                copiedFiles.add(file)
                                input.copyTo(output, 1024)
                            }
                        }

                        //If completed then it returns the new path.
                        return@async pathFileTemp
                    }
                }.awaitAll()
            }
        } finally {
            //shows list complete in the UI
        }
    }
    job.invokeOnCompletion {
        it?.takeIf { it is CancellationException }?.let {
            GlobalScope.launch(Dispatchers.IO) {
                copiedFiles.forEach { file ->
                    file.delete()
                }
            }
        }
    }
}

回答 1. 和 4.:

为了划分并行任务并让它们独立完成(获取一些值,同时取消其余值),您需要使用 Channel,最好是 Flow。简化示例:

fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
   val workToDistribute = Channel<Whatever>()
   launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...

    repeat(concurrency) { // launch a specified number of worker coroutines
      launch { 
         for (task in workToDistribute) { // which process tasks in a loop
            val atomicResult = process(task)
            send(atomicResult) // and send results downstream to a Flow
         }
      }
   }
}

然后您可以一个接一个地处理结果,因为它们正在生成,等待整个流程完成,或者例如需要时只取其中一些: resultFlow.take(20).onEach { ... }.collectIn(someScope) 因为它是一个流,只有当有人开始收集(天冷)时它才会开始工作,这通常是一件好事。

整个事情可能会变得更短一些,因为你会发现一些更具体和实验性的功能(如产品)。它可以概括为这样的流运算符:

fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
    require(concurrency > 1) { "No sense with concurrency < 2" }
    return channelFlow {
        val inputChannel = produceIn(this)
        repeat(concurrency) {
            launch {
                for (input in inputChannel) send(transform(input))
            }
        }
    }
}

并使用:list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }

corotuines 团队正在考虑向 Flow 流中添加一系列并行运算符,但 AFAIK 目前还没有。