WorkManager 两次启动 Worker

WorkManager start Worker twice

我有一组大任务要在后台执行:

  1. 加载数据
  2. 正在解析一堆文件并将它们存储在 Room

出于这个原因,我用相同的 tag 创建了独特的 Worker 链。

class GtfsStaticManager() {
    private val workerManager = WorkManager.getInstance()

    override fun load() {
        val constraints = Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()

        val inputData = GtfsStaticLoadDataWorker.inputData(staticUrl, cacheDir)
        // 1. Loading data
        val downloadWorkRequest = OneTimeWorkRequest.Builder(GtfsStaticLoadDataWorker::class.java)
            .addTag("GTFS")
            .setConstraints(constraints)
            .setInputData(inputData)
            .build()

        // 2. List of Workers to parse and store data to the Room
        val parseWorkers = GtfsFile.values().map {
            OneTimeWorkRequest.Builder(GtfsStaticParseFileWorker::class.java)
                .setInputData(GtfsStaticParseFileWorker.inputData(it.file, cacheDir + File.separator + "feed"))
                .addTag("GTFS")
                .build()
        }

        workerManager
            .beginUniqueWork("GTFS", ExistingWorkPolicy.KEEP, downloadWorkRequest)
            .then(parseWorkers)
            .enqueue()
    }
}

除一件事外一切正常:其中一个文件有 400 万条记录,完成它大约需要 10-15 分钟。一段时间后,我注意到它再次入队 但是 第一份工作仍然是 运行,因此我在后台有 2 个巨大的工作 运行,当然我的数据重复了。

我遵循了 codelabs 教程,我是否遗漏了什么?

下面是我的Worker解析逻辑:

class GtfsStaticParseFileWorker(
    context: Context,
    workerParameters: WorkerParameters
) : Worker(context, workerParameters) {
    private val fileName: String get() = inputData.getString(FILE_NAME) ?: ""
    private val cacheDir: String get() = inputData.getString(UNZIP_FOLDER) ?: ""

    companion object {
        private const val FILE_NAME = "FILE_NAME"
        private const val UNZIP_FOLDER = "UNZIP_FOLDER"
        fun inputData(fileName: String, cacheDir: String) = Data
            .Builder()
            .putString(FILE_NAME, fileName)
            .putString(UNZIP_FOLDER, cacheDir)
            .build()
    }

    override fun doWork(): Result {
        val db = LvivTransportTrackerDataBase.getUpdateInstance(applicationContext)
        val agencyRepository = AgencyRepository(db.agencyDao())
        val calendarRepository = CalendarRepository(db.calendarDao())
        val calendarDateRepository = CalendarDateRepository(db.calendarDateDao())
        val routeRepository = RouteRepository(db.routeDao())
        val stopTimeRepository = StopTimeRepository(db.stopTimeDao())
        val stopRepository = StopRepository(db.stopDao())
        val tripRepository = TripRepository(db.tripDao())

        val file = File(cacheDir + File.separator + fileName)
        val fileType = GtfsFile.from(fileName) ?: return Result.failure()

        when (fileType) {
            GtfsFile.Agency -> agencyRepository.deleteAll()
            GtfsFile.CalendarDates -> calendarDateRepository.deleteAll()
            GtfsFile.Calendar -> calendarRepository.deleteAll()
            GtfsFile.Routes -> routeRepository.deleteAll()
            GtfsFile.StopTimes -> stopTimeRepository.deleteAll()
            GtfsFile.Stops -> stopRepository.deleteAll()
            GtfsFile.Trips -> tripRepository.deleteAll()
        }

        FileInputStream(file).use { fileInputStream ->
            InputStreamReader(fileInputStream).use inputStreamReader@{ inputStreamReader ->
                val bufferedReader = BufferedReader(inputStreamReader)
                val headers = bufferedReader.readLine()?.split(',') ?: return@inputStreamReader
                var line: String? = bufferedReader.readLine()

                while (line != null) {
                    val mapLine = headers.zip(line.split(',')).toMap()

                    Log.d("GtfsStaticParse", "$fileType: $line")
                    when (fileType) {
                        GtfsFile.Agency -> agencyRepository.create(AgencyEntity(mapLine))
                        GtfsFile.CalendarDates -> calendarDateRepository.create(CalendarDateEntity(mapLine))
                        GtfsFile.Calendar -> calendarRepository.create(CalendarEntity(mapLine))
                        GtfsFile.Routes -> routeRepository.create(RouteEntity(mapLine))
                        GtfsFile.StopTimes -> stopTimeRepository.create(StopTimeEntity(mapLine))
                        GtfsFile.Stops -> stopRepository.create(StopEntity(mapLine))
                        GtfsFile.Trips -> tripRepository.create(TripEntity(mapLine))
                    }

                    line = bufferedReader.readLine()
                }
            }
        }

        return Result.success()
    }
}

P.S。我的依赖是 implementation "android.arch.work:work-runtime:1.0.0"

WorkManager 中的 Worker 类 的执行时间限制为 10 分钟。
来自 WorkManager guide on how to handle cancellation:

The system instructed your app to stop your work for some reason. This can happen if you exceed the execution deadline of 10 minutes. The work is scheduled for retry at a later time.

在您的情况下,您没有处理工作的停止,但 WorkManager 将忽略任何结果,因为它将作业标记为 "cancelled" 并且它会在可能的情况下再次执行它。

这可能会导致您遇到双重执行。

在不了解您想要实现的目标的情况下很难提出替代方法,但是,作为一般规则,WorkManager 适用于需要保证执行的可延迟任务。

WorkManager documentation 在 1.0 版本后得到了扩展,您可以在那里找到更多信息。

您需要首先检查下载的文件是否存在于下载文件的路径上,然后将字节存储在循环内的 SharedPreferences 中,这些字节是先前从下载路径读取的,一旦您的作业再次启动,则首先检查下载的字节和下一次从该偏移量开始。

示例代码:

RandomAccessFile seeker = new RandomAccessFile(fname, "r");
seeker.seek(readOffset()); // move to the offset
seeker.readLine(); // and read the String

如果您需要对数据库条目进行相同的检查,则将完成的行的状态标记为“成功”,以便下次跳过该行。