Android 工作人员 - 在重试期间更新和保留状态

Android worker - update and preserve state across retries

Kotlin/Android新手看这里:)。我正在使用 CoroutineWorker 进行分块上传,但没有看到一种内置的方法来为我的工作人员维护状态,以防发生重试,但我很难相信这样的事情会丢失...

我的用例如下:

  1. 使用要作为输入数据上传的文件的路径创建工作请求
  2. Worker 遍历文件并分块执行上传。正在跟踪最新上传的chunkIndex
  3. 万一发生错误和随后的 Retry(),工作人员 以某种方式检索 当前块索引并恢复,而不是再次从头开始。

所以基本上,我真的只需要保留那个 chunkIndex 标志。我研究了设置进度,但这似乎在重试时成功或失败(一次成功,另一次尝试不可用)。

override suspend fun doWork(): Result {
    try {
        // TODO check if we are resuming with a given chunk index
        chunkIndex = ...

        // do the work
        performUpload(...)

        return Result.success()

    } catch (e: Exception) {
        // TODO cache the chunk index


        return Result.retry()
    }
}

我是否忽略了什么,或者我真的必须将该索引存储在 worker 之外吗?

您有一个很好的用例,但不幸的是 您无法在 Worker class 中缓存数据或将数据传递给下一个 Worker 对象重试! 如您所料,您必须将索引存储在 WorkManager 提供的结构之外!

长答案,

Worker对象可以接收和return数据。它可以从 getInputData() 方法访问数据。如果你 chain tasks,一个工人的输出可以为下一个工人输入。这可以通过 returning Result.success(output) 来完成(见下面的代码)

public Result doWork() {
        int chunkIndex = upload();

        //...set the output, and we're done!
        Data output = new Data.Builder()
            .putInt(KEY_RESULT, result)
            .build();
        return Result.success(output);
}

所以问题是我们无法 return 重试案例的数据,只有失败和成功的案例! (Result.retry(Data data) 缺少方法!)

参考:官方documentation and API.

正如 GB 的回答中所述,似乎没有办法在 worker 中缓存数据,或者做一个 Result.retry(data)。我最后只是用 SharedPreferences 做了一个快速破解。

下面的解决方案。对它持保留态度,我总共有大约 10 个小时的 Kotlin 知识;)

var latestChunkIndex = -1

override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
    try {
        // get cached entry (simplified - no checking for fishy status or anything)
        val transferId = id.toString()
        var uploadInfo: UploadInfo = TransferCache.tryGetUpload(applicationContext, transferId) ?: TransferCache.registerUpload(applicationContext, transferId, TransferStatus.InProgress)

        if(uploadInfo.status != TransferStatus.InProgress) {
            TransferCache.setUploadStatus(applicationContext, transferId, TransferStatus.InProgress)
        }

        // resolve the current chunk - this will allow us to resume in case we're retrying
        latestChunkIndex = uploadInfo.latestChunkIndex

        // do the actual work
        upload()

        // update status and complete
        TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Success)
        Result.success()
    } catch (e: Exception) {
        if (runAttemptCount > 20) {
            // give up
            TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Error)
            Result.failure()
        }

        // update status and schedule retry
        TransferCache.setUploadStatus(applicationContext, id.toString(), TransferStatus.Paused)
        Result.retry()
    }
}

在我的 upload 函数中,我只是在跟踪我的缓存(我也可以在 doWork 方法的异常处理程序中执行此操作,但我将使用缓存状态检查的条目也是如此,而且很便宜):

private suspend fun upload() {
    while ((latestChunkIndex + 1) * defaultChunkSize < fileSize) {

        // doing the actual upload
        ...

        // increment chunk number and store as progress
        latestChunkIndex += 1
        TransferCache.cacheUploadProgress(applicationContext, id.toString(), latestChunkIndex)
    }
}

TransferCache 看起来像这样(请注意那里 没有 内务处理,所以如果不清理,它会继续增长!)

class UploadInfo() {
    var transferId: String = ""
    var status: TransferStatus = TransferStatus.Undefined
    var latestChunkIndex: Int = -1

    constructor(transferId: String) : this() {
        this.transferId = transferId
    }
}


object TransferCache {

    private const val PREFERENCES_NAME = "${BuildConfig.APPLICATION_ID}.transfercache"
    private val gson = Gson()

    fun tryGetUpload(context: Context, transferId: String): UploadInfo? {
        return getPreferences(context).tryGetUpload(transferId);
    }


    fun cacheUploadProgress(context: Context, transferId: String, transferredChunkIndex: Int): UploadInfo {
        getPreferences(context).run {
            // get or create entry, update and save
            val uploadInfo = tryGetUpload(transferId)!!
            uploadInfo.latestChunkIndex = transferredChunkIndex
            return saveUpload(uploadInfo)
        }
    }


    fun setUploadStatus(context: Context, transferId: String, status: TransferStatus): UploadInfo {
        getPreferences(context).run {
            val upload = tryGetUpload(transferId) ?: registerUpload(context, transferId, status)
            if (upload.status != status) {
                upload.status = status
                saveUpload(upload)
            }

            return upload
        }
    }


    /**
     * Registers a new upload transfer. This would simply (and silently) override any
     * existing registration.
     */
    fun registerUpload(context: Context, transferId: String, status: TransferStatus): UploadInfo {
        getPreferences(context).run {
            val upload = UploadInfo(transferId).apply {
                this.status = status
            }
            return saveUpload(upload)
        }
    }


    private fun getPreferences(context: Context): SharedPreferences {
        return context.getSharedPreferences(
            PREFERENCES_NAME,
            Context.MODE_PRIVATE
        )
    }


    private fun SharedPreferences.tryGetUpload(transferId: String): UploadInfo? {
        val data: String? = getString(transferId, null)
        return if (data == null)
            null
        else
            gson.fromJson(data, UploadInfo::class.java)
    }


    private fun SharedPreferences.saveUpload(uploadInfo: UploadInfo): UploadInfo {
        val editor = edit()
        editor.putString(uploadInfo.transferId, gson.toJson(uploadInfo))
        editor.apply()
        return uploadInfo;
    }
 }