Android 工作人员 - 在重试期间更新和保留状态
Android worker - update and preserve state across retries
Kotlin/Android新手看这里:)。我正在使用 CoroutineWorker 进行分块上传,但没有看到一种内置的方法来为我的工作人员维护状态,以防发生重试,但我很难相信这样的事情会丢失...
我的用例如下:
- 使用要作为输入数据上传的文件的路径创建工作请求
- Worker 遍历文件并分块执行上传。正在跟踪最新上传的
chunkIndex
。
- 万一发生错误和随后的
Retry()
,工作人员 以某种方式检索 当前块索引并恢复,而不是再次从头开始。
所以基本上,我真的只需要保留那个 chunkIndex
标志。我研究了设置进度,但这似乎在重试时成功或失败(一次成功,另一次尝试不可用)。
override suspend fun doWork(): Result {
try {
// TODO check if we are resuming with a given chunk index
chunkIndex = ...
// do the work
performUpload(...)
return Result.success()
} catch (e: Exception) {
// TODO cache the chunk index
return Result.retry()
}
}
我是否忽略了什么,或者我真的必须将该索引存储在 worker 之外吗?
您有一个很好的用例,但不幸的是 您无法在 Worker
class 中缓存数据或将数据传递给下一个 Worker
对象重试! 如您所料,您必须将索引存储在 WorkManager
提供的结构之外!
长答案,
Worker
对象可以接收和return数据。它可以从 getInputData()
方法访问数据。如果你 chain tasks,一个工人的输出可以为下一个工人输入。这可以通过 returning Result.success(output)
来完成(见下面的代码)
public Result doWork() {
int chunkIndex = upload();
//...set the output, and we're done!
Data output = new Data.Builder()
.putInt(KEY_RESULT, result)
.build();
return Result.success(output);
}
所以问题是我们无法 return 重试案例的数据,只有失败和成功的案例! (Result.retry(Data data)
缺少方法!)
参考:官方documentation and API.
正如 GB 的回答中所述,似乎没有办法在 worker 中缓存数据,或者做一个 Result.retry(data)
。我最后只是用 SharedPreferences
做了一个快速破解。
下面的解决方案。对它持保留态度,我总共有大约 10 个小时的 Kotlin 知识;)
var latestChunkIndex = -1
override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
try {
// get cached entry (simplified - no checking for fishy status or anything)
val transferId = id.toString()
var uploadInfo: UploadInfo = TransferCache.tryGetUpload(applicationContext, transferId) ?: TransferCache.registerUpload(applicationContext, transferId, TransferStatus.InProgress)
if(uploadInfo.status != TransferStatus.InProgress) {
TransferCache.setUploadStatus(applicationContext, transferId, TransferStatus.InProgress)
}
// resolve the current chunk - this will allow us to resume in case we're retrying
latestChunkIndex = uploadInfo.latestChunkIndex
// do the actual work
upload()
// update status and complete
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Success)
Result.success()
} catch (e: Exception) {
if (runAttemptCount > 20) {
// give up
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Error)
Result.failure()
}
// update status and schedule retry
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Paused)
Result.retry()
}
}
在我的 upload
函数中,我只是在跟踪我的缓存(我也可以在 doWork
方法的异常处理程序中执行此操作,但我将使用缓存状态检查的条目也是如此,而且很便宜):
private suspend fun upload() {
while ((latestChunkIndex + 1) * defaultChunkSize < fileSize) {
// doing the actual upload
...
// increment chunk number and store as progress
latestChunkIndex += 1
TransferCache.cacheUploadProgress(applicationContext, id.toString(), latestChunkIndex)
}
}
和 TransferCache
看起来像这样(请注意那里 没有 内务处理,所以如果不清理,它会继续增长!)
class UploadInfo() {
var transferId: String = ""
var status: TransferStatus = TransferStatus.Undefined
var latestChunkIndex: Int = -1
constructor(transferId: String) : this() {
this.transferId = transferId
}
}
object TransferCache {
private const val PREFERENCES_NAME = "${BuildConfig.APPLICATION_ID}.transfercache"
private val gson = Gson()
fun tryGetUpload(context: Context, transferId: String): UploadInfo? {
return getPreferences(context).tryGetUpload(transferId);
}
fun cacheUploadProgress(context: Context, transferId: String, transferredChunkIndex: Int): UploadInfo {
getPreferences(context).run {
// get or create entry, update and save
val uploadInfo = tryGetUpload(transferId)!!
uploadInfo.latestChunkIndex = transferredChunkIndex
return saveUpload(uploadInfo)
}
}
fun setUploadStatus(context: Context, transferId: String, status: TransferStatus): UploadInfo {
getPreferences(context).run {
val upload = tryGetUpload(transferId) ?: registerUpload(context, transferId, status)
if (upload.status != status) {
upload.status = status
saveUpload(upload)
}
return upload
}
}
/**
* Registers a new upload transfer. This would simply (and silently) override any
* existing registration.
*/
fun registerUpload(context: Context, transferId: String, status: TransferStatus): UploadInfo {
getPreferences(context).run {
val upload = UploadInfo(transferId).apply {
this.status = status
}
return saveUpload(upload)
}
}
private fun getPreferences(context: Context): SharedPreferences {
return context.getSharedPreferences(
PREFERENCES_NAME,
Context.MODE_PRIVATE
)
}
private fun SharedPreferences.tryGetUpload(transferId: String): UploadInfo? {
val data: String? = getString(transferId, null)
return if (data == null)
null
else
gson.fromJson(data, UploadInfo::class.java)
}
private fun SharedPreferences.saveUpload(uploadInfo: UploadInfo): UploadInfo {
val editor = edit()
editor.putString(uploadInfo.transferId, gson.toJson(uploadInfo))
editor.apply()
return uploadInfo;
}
}
Kotlin/Android新手看这里:)。我正在使用 CoroutineWorker 进行分块上传,但没有看到一种内置的方法来为我的工作人员维护状态,以防发生重试,但我很难相信这样的事情会丢失...
我的用例如下:
- 使用要作为输入数据上传的文件的路径创建工作请求
- Worker 遍历文件并分块执行上传。正在跟踪最新上传的
chunkIndex
。 - 万一发生错误和随后的
Retry()
,工作人员 以某种方式检索 当前块索引并恢复,而不是再次从头开始。
所以基本上,我真的只需要保留那个 chunkIndex
标志。我研究了设置进度,但这似乎在重试时成功或失败(一次成功,另一次尝试不可用)。
override suspend fun doWork(): Result {
try {
// TODO check if we are resuming with a given chunk index
chunkIndex = ...
// do the work
performUpload(...)
return Result.success()
} catch (e: Exception) {
// TODO cache the chunk index
return Result.retry()
}
}
我是否忽略了什么,或者我真的必须将该索引存储在 worker 之外吗?
您有一个很好的用例,但不幸的是 您无法在 Worker
class 中缓存数据或将数据传递给下一个 Worker
对象重试! 如您所料,您必须将索引存储在 WorkManager
提供的结构之外!
长答案,
Worker
对象可以接收和return数据。它可以从 getInputData()
方法访问数据。如果你 chain tasks,一个工人的输出可以为下一个工人输入。这可以通过 returning Result.success(output)
来完成(见下面的代码)
public Result doWork() {
int chunkIndex = upload();
//...set the output, and we're done!
Data output = new Data.Builder()
.putInt(KEY_RESULT, result)
.build();
return Result.success(output);
}
所以问题是我们无法 return 重试案例的数据,只有失败和成功的案例! (Result.retry(Data data)
缺少方法!)
参考:官方documentation and API.
正如 GB 的回答中所述,似乎没有办法在 worker 中缓存数据,或者做一个 Result.retry(data)
。我最后只是用 SharedPreferences
做了一个快速破解。
下面的解决方案。对它持保留态度,我总共有大约 10 个小时的 Kotlin 知识;)
var latestChunkIndex = -1
override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
try {
// get cached entry (simplified - no checking for fishy status or anything)
val transferId = id.toString()
var uploadInfo: UploadInfo = TransferCache.tryGetUpload(applicationContext, transferId) ?: TransferCache.registerUpload(applicationContext, transferId, TransferStatus.InProgress)
if(uploadInfo.status != TransferStatus.InProgress) {
TransferCache.setUploadStatus(applicationContext, transferId, TransferStatus.InProgress)
}
// resolve the current chunk - this will allow us to resume in case we're retrying
latestChunkIndex = uploadInfo.latestChunkIndex
// do the actual work
upload()
// update status and complete
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Success)
Result.success()
} catch (e: Exception) {
if (runAttemptCount > 20) {
// give up
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Error)
Result.failure()
}
// update status and schedule retry
TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Paused)
Result.retry()
}
}
在我的 upload
函数中,我只是在跟踪我的缓存(我也可以在 doWork
方法的异常处理程序中执行此操作,但我将使用缓存状态检查的条目也是如此,而且很便宜):
private suspend fun upload() {
while ((latestChunkIndex + 1) * defaultChunkSize < fileSize) {
// doing the actual upload
...
// increment chunk number and store as progress
latestChunkIndex += 1
TransferCache.cacheUploadProgress(applicationContext, id.toString(), latestChunkIndex)
}
}
和 TransferCache
看起来像这样(请注意那里 没有 内务处理,所以如果不清理,它会继续增长!)
class UploadInfo() {
var transferId: String = ""
var status: TransferStatus = TransferStatus.Undefined
var latestChunkIndex: Int = -1
constructor(transferId: String) : this() {
this.transferId = transferId
}
}
object TransferCache {
private const val PREFERENCES_NAME = "${BuildConfig.APPLICATION_ID}.transfercache"
private val gson = Gson()
fun tryGetUpload(context: Context, transferId: String): UploadInfo? {
return getPreferences(context).tryGetUpload(transferId);
}
fun cacheUploadProgress(context: Context, transferId: String, transferredChunkIndex: Int): UploadInfo {
getPreferences(context).run {
// get or create entry, update and save
val uploadInfo = tryGetUpload(transferId)!!
uploadInfo.latestChunkIndex = transferredChunkIndex
return saveUpload(uploadInfo)
}
}
fun setUploadStatus(context: Context, transferId: String, status: TransferStatus): UploadInfo {
getPreferences(context).run {
val upload = tryGetUpload(transferId) ?: registerUpload(context, transferId, status)
if (upload.status != status) {
upload.status = status
saveUpload(upload)
}
return upload
}
}
/**
* Registers a new upload transfer. This would simply (and silently) override any
* existing registration.
*/
fun registerUpload(context: Context, transferId: String, status: TransferStatus): UploadInfo {
getPreferences(context).run {
val upload = UploadInfo(transferId).apply {
this.status = status
}
return saveUpload(upload)
}
}
private fun getPreferences(context: Context): SharedPreferences {
return context.getSharedPreferences(
PREFERENCES_NAME,
Context.MODE_PRIVATE
)
}
private fun SharedPreferences.tryGetUpload(transferId: String): UploadInfo? {
val data: String? = getString(transferId, null)
return if (data == null)
null
else
gson.fromJson(data, UploadInfo::class.java)
}
private fun SharedPreferences.saveUpload(uploadInfo: UploadInfo): UploadInfo {
val editor = edit()
editor.putString(uploadInfo.transferId, gson.toJson(uploadInfo))
editor.apply()
return uploadInfo;
}
}