使用协程读取和复制文件
Read and copy file with Coroutines
我创建了以下应用程序来说明一些疑问。 My Example on the Github
在这个例子中,我将一个文件复制到另一个包中。
我的疑惑如下:
并行执行任务,是否可以return取消前完成的值?
为什么在 contentResolver.openInputStream (uri)
中出现消息“不适当的阻塞方法调用”,而我正在使用 IO 上下文?
当我读取要复制到输出的文件条目时,我总是检查作业状态,以便在取消此任务时立即停止,删除创建的输出文件并return取消异常,对吗?
我可以限定执行的任务量吗?
我的 onCreate:
private val listUri = mutableListOf<Uri>()
private val job = Job()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//get files from 1 to 40
val packageName = "android.resource://${packageName}/raw/"
for (i in 1..40) {
listUri.add(Uri.parse("${packageName}file$i"))
}
}
我的按钮操作:
//Button action
fun onClickStartTask(view: View) {
var listNewPath = emptyList<String>()
CoroutineScope(Main + job).launch {
try {
//shows something in the UI - progressBar
withContext(IO) {
listNewPath = listUri.map { uri ->
async {
//path to file temp
val pathFileTemp =
"${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
val file = File(pathFileTemp)
val inputStream = contentResolver.openInputStream(uri)
inputStream?.use { input ->
FileOutputStream(file).use { output ->
val buffer = ByteArray(1024)
var read: Int = input.read(buffer)
while (read != -1) {
if (isActive) {
output.write(buffer, 0, read)
read = input.read(buffer)
} else {
input.close()
output.close()
file.deleteRecursively()
throw CancellationException()
}
}
}
}
//If completed then it returns the new path.
return@async pathFileTemp
}
}.awaitAll()
}
} finally {
//shows list complete in the UI
}
}
}
我的取消作业按钮:
fun onClickCancelTask(view: View) {
if (job.isActive) {
job.cancelChildren()
println("Cancel children")
}
}
这将是执行任务的按钮操作。
谢谢大家的帮助。
我认为这是更好的方法
fun onClickStartTask(view: View) {
var listNewPath = emptyList<String>()
val copiedFiles = mutableListOf<File>()
CoroutineScope(Dispatchers.Main + job).launch {
try {
//shows something in the UI - progressBar
withContext(Dispatchers.IO) {
listNewPath = listUri.map { uri ->
async {
//path to file temp
val pathFileTemp =
"${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
val file = File(pathFileTemp)
val inputStream = contentResolver.openInputStream(uri)
inputStream?.use { input ->
file.outputStream().use { output ->
copiedFiles.add(file)
input.copyTo(output, 1024)
}
}
//If completed then it returns the new path.
return@async pathFileTemp
}
}.awaitAll()
}
} finally {
//shows list complete in the UI
}
}
job.invokeOnCompletion {
it?.takeIf { it is CancellationException }?.let {
GlobalScope.launch(Dispatchers.IO) {
copiedFiles.forEach { file ->
file.delete()
}
}
}
}
}
回答 1. 和 4.:
为了划分并行任务并让它们独立完成(获取一些值,同时取消其余值),您需要使用 Channel,最好是 Flow。简化示例:
fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
val workToDistribute = Channel<Whatever>()
launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...
repeat(concurrency) { // launch a specified number of worker coroutines
launch {
for (task in workToDistribute) { // which process tasks in a loop
val atomicResult = process(task)
send(atomicResult) // and send results downstream to a Flow
}
}
}
}
然后您可以一个接一个地处理结果,因为它们正在生成,等待整个流程完成,或者例如需要时只取其中一些:
resultFlow.take(20).onEach { ... }.collectIn(someScope)
因为它是一个流,只有当有人开始收集(天冷)时它才会开始工作,这通常是一件好事。
整个事情可能会变得更短一些,因为你会发现一些更具体和实验性的功能(如产品)。它可以概括为这样的流运算符:
fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
require(concurrency > 1) { "No sense with concurrency < 2" }
return channelFlow {
val inputChannel = produceIn(this)
repeat(concurrency) {
launch {
for (input in inputChannel) send(transform(input))
}
}
}
}
并使用:list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }
corotuines 团队正在考虑向 Flow 流中添加一系列并行运算符,但 AFAIK 目前还没有。
我创建了以下应用程序来说明一些疑问。 My Example on the Github
在这个例子中,我将一个文件复制到另一个包中。
我的疑惑如下:
并行执行任务,是否可以return取消前完成的值?
为什么在
contentResolver.openInputStream (uri)
中出现消息“不适当的阻塞方法调用”,而我正在使用 IO 上下文?当我读取要复制到输出的文件条目时,我总是检查作业状态,以便在取消此任务时立即停止,删除创建的输出文件并return取消异常,对吗?
我可以限定执行的任务量吗?
我的 onCreate:
private val listUri = mutableListOf<Uri>()
private val job = Job()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//get files from 1 to 40
val packageName = "android.resource://${packageName}/raw/"
for (i in 1..40) {
listUri.add(Uri.parse("${packageName}file$i"))
}
}
我的按钮操作:
//Button action
fun onClickStartTask(view: View) {
var listNewPath = emptyList<String>()
CoroutineScope(Main + job).launch {
try {
//shows something in the UI - progressBar
withContext(IO) {
listNewPath = listUri.map { uri ->
async {
//path to file temp
val pathFileTemp =
"${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
val file = File(pathFileTemp)
val inputStream = contentResolver.openInputStream(uri)
inputStream?.use { input ->
FileOutputStream(file).use { output ->
val buffer = ByteArray(1024)
var read: Int = input.read(buffer)
while (read != -1) {
if (isActive) {
output.write(buffer, 0, read)
read = input.read(buffer)
} else {
input.close()
output.close()
file.deleteRecursively()
throw CancellationException()
}
}
}
}
//If completed then it returns the new path.
return@async pathFileTemp
}
}.awaitAll()
}
} finally {
//shows list complete in the UI
}
}
}
我的取消作业按钮:
fun onClickCancelTask(view: View) {
if (job.isActive) {
job.cancelChildren()
println("Cancel children")
}
}
这将是执行任务的按钮操作。
谢谢大家的帮助。
我认为这是更好的方法
fun onClickStartTask(view: View) {
var listNewPath = emptyList<String>()
val copiedFiles = mutableListOf<File>()
CoroutineScope(Dispatchers.Main + job).launch {
try {
//shows something in the UI - progressBar
withContext(Dispatchers.IO) {
listNewPath = listUri.map { uri ->
async {
//path to file temp
val pathFileTemp =
"${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
val file = File(pathFileTemp)
val inputStream = contentResolver.openInputStream(uri)
inputStream?.use { input ->
file.outputStream().use { output ->
copiedFiles.add(file)
input.copyTo(output, 1024)
}
}
//If completed then it returns the new path.
return@async pathFileTemp
}
}.awaitAll()
}
} finally {
//shows list complete in the UI
}
}
job.invokeOnCompletion {
it?.takeIf { it is CancellationException }?.let {
GlobalScope.launch(Dispatchers.IO) {
copiedFiles.forEach { file ->
file.delete()
}
}
}
}
}
回答 1. 和 4.:
为了划分并行任务并让它们独立完成(获取一些值,同时取消其余值),您需要使用 Channel,最好是 Flow。简化示例:
fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
val workToDistribute = Channel<Whatever>()
launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...
repeat(concurrency) { // launch a specified number of worker coroutines
launch {
for (task in workToDistribute) { // which process tasks in a loop
val atomicResult = process(task)
send(atomicResult) // and send results downstream to a Flow
}
}
}
}
然后您可以一个接一个地处理结果,因为它们正在生成,等待整个流程完成,或者例如需要时只取其中一些:
resultFlow.take(20).onEach { ... }.collectIn(someScope)
因为它是一个流,只有当有人开始收集(天冷)时它才会开始工作,这通常是一件好事。
整个事情可能会变得更短一些,因为你会发现一些更具体和实验性的功能(如产品)。它可以概括为这样的流运算符:
fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
require(concurrency > 1) { "No sense with concurrency < 2" }
return channelFlow {
val inputChannel = produceIn(this)
repeat(concurrency) {
launch {
for (input in inputChannel) send(transform(input))
}
}
}
}
并使用:list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }
corotuines 团队正在考虑向 Flow 流中添加一系列并行运算符,但 AFAIK 目前还没有。