如何让地图等到当前索引项完成处理,然后使用 RxJava 获取下一项进行处理?
How to make the map wait till the current index item is finished processing and then take the next item for processing using RxJava?
我正在尝试将输入流转换为文件。因此,当用户选择 1 张图像时,一切正常。但是当用户选择多个图像时,例如 4 那么下面的代码没有按预期工作;我在日志语句中只看到 2 个文件路径。
compositeDisposable.add(Observable.fromIterable(inputStreamList)
.map {
FileUtils.saveInputStreamToFile(it, directory, 500)
}.toList()
.toObservable()
.subscribeOn(schedulerProvider.io())
.subscribe({
it.forEach {file->
Log.d("TAG", "Path ${file.path}")
}
Log.d("TAG", "Size ${it.size}")
}, {
})
)
这是 saveInputStreamToFile 方法
fun saveInputStreamToFile(input: InputStream, directory: File, height: Int): File? {
val currentTime = dateFormat.format(Date())
val imageName = "_$currentTime"
val temp = File(directory.path + File.separator + "flab$file$for$processing")
try {
val final = File(directory.path + File.separator + imageName + ".$IMAGE_JPG")
val output = FileOutputStream(temp)
try {
val buffer = ByteArray(4 * 1024) // or other buffer size
var read: Int = input.read(buffer)
while (read != -1) {
output.write(buffer, 0, read)
read = input.read(buffer)
}
output.flush()
saveBitmap(decodeFile(temp, height)!!, final.path, IMAGE_JPG, 80)
return final
} finally {
output.close()
temp.delete()
}
} finally {
input.close()
}
}
我希望仅在当前输入流转换为文件后才获取下一个输入流。如何做到这一点?请帮助
在 RX 中你可以这样做:
Observable.fromCallable {
return inputStreamList.map {
FileUtils.saveInputStreamToFile(it, directory, 500)
}.toList()
}.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(object : Observer<List<File>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(files: List<File>) {
//here you can iterate and complete do your work
files.forEach { file ->
Log.d("TAG", "Path ${file.path}")
}
Log.d("TAG", "Size ${files.size}")
}
override fun onError(e: Throwable) {
}
})
这将确保您的工作将在后台同步完成,并且您将按照指定在 MainThread onNext()
中观察您的结果。
确保使用包 io.reactivex
编辑:
摆脱你的 RX 包并在 build.gradle
中设置这些
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
我正在尝试将输入流转换为文件。因此,当用户选择 1 张图像时,一切正常。但是当用户选择多个图像时,例如 4 那么下面的代码没有按预期工作;我在日志语句中只看到 2 个文件路径。
compositeDisposable.add(Observable.fromIterable(inputStreamList)
.map {
FileUtils.saveInputStreamToFile(it, directory, 500)
}.toList()
.toObservable()
.subscribeOn(schedulerProvider.io())
.subscribe({
it.forEach {file->
Log.d("TAG", "Path ${file.path}")
}
Log.d("TAG", "Size ${it.size}")
}, {
})
)
这是 saveInputStreamToFile 方法
fun saveInputStreamToFile(input: InputStream, directory: File, height: Int): File? {
val currentTime = dateFormat.format(Date())
val imageName = "_$currentTime"
val temp = File(directory.path + File.separator + "flab$file$for$processing")
try {
val final = File(directory.path + File.separator + imageName + ".$IMAGE_JPG")
val output = FileOutputStream(temp)
try {
val buffer = ByteArray(4 * 1024) // or other buffer size
var read: Int = input.read(buffer)
while (read != -1) {
output.write(buffer, 0, read)
read = input.read(buffer)
}
output.flush()
saveBitmap(decodeFile(temp, height)!!, final.path, IMAGE_JPG, 80)
return final
} finally {
output.close()
temp.delete()
}
} finally {
input.close()
}
}
我希望仅在当前输入流转换为文件后才获取下一个输入流。如何做到这一点?请帮助
在 RX 中你可以这样做:
Observable.fromCallable {
return inputStreamList.map {
FileUtils.saveInputStreamToFile(it, directory, 500)
}.toList()
}.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(object : Observer<List<File>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(files: List<File>) {
//here you can iterate and complete do your work
files.forEach { file ->
Log.d("TAG", "Path ${file.path}")
}
Log.d("TAG", "Size ${files.size}")
}
override fun onError(e: Throwable) {
}
})
这将确保您的工作将在后台同步完成,并且您将按照指定在 MainThread onNext()
中观察您的结果。
确保使用包 io.reactivex
编辑:
摆脱你的 RX 包并在 build.gradle
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'