协程的长寿服务

Long living service with coroutines

我想创建一个可以处理事件的长期服务。 它通过 postEvent 接收事件,将其存储在存储库(带有底层数据库)中,并在有足够的事件时发送一批 api。

我还想按需关闭它。 此外,我想测试这项服务。

这就是我到目前为止的想法。目前我正在努力进行单元测试。 要么在事件通过 fixture.postEvent() 发送到服务后过早关闭数据库,要么测试本身陷入某种死锁(正在尝试各种上下文 + 作业配置)。

我做错了什么?

class EventSenderService(
        private val repository: EventRepository,
        private val api: Api,
        private val serializer: GsonSerializer,
        private val requestBodyBuilder: EventRequestBodyBuilder,
) : EventSender, CoroutineScope {

    private val eventBatchSize = 25

    val job = Job()
    private val channel = Channel<Unit>()

    init {
        job.start()

        launch {
            for (event in channel) {
                val trackingEventCount = repository.getTrackingEventCount()

                if (trackingEventCount < eventBatchSize) continue

                readSendDelete()
            }
        }
    }

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job

    override fun postEvent(event: Event) {
        launch(Dispatchers.IO) {
            writeEventToDatabase(event)
        }
    }

    override fun close() {
        channel.close()
        job.cancel()
    }

    private fun readSendDelete() {
        try {
            val events = repository.getTrackingEvents(eventBatchSize)

            val request = requestBodyBuilder.buildFor(events).blockingGet()

            api.postEvents(request).blockingGet()

            repository.deleteTrackingEvents(events)
        } catch (throwable: Throwable) {
            Log.e(throwable)
        }
    }

    private suspend fun writeEventToDatabase(event: Event) {
        try {
            val trackingEvent = TrackingEvent(eventData = serializer.toJson(event))
            repository.insert(trackingEvent)
            channel.send(Unit)
        } catch (throwable: Throwable) {
            throwable.printStackTrace()
            Log.e(throwable)
        }
    }
}

测试

@RunWith(RobolectricTestRunner::class)
class EventSenderServiceTest : CoroutineScope {

    @Rule
    @JvmField
    val instantExecutorRule = InstantTaskExecutorRule()

    private val api: Api = mock {
        on { postEvents(any()) } doReturn Single.just(BaseResponse())
    }
    private val serializer: GsonSerializer = mock {
        on { toJson<Any>(any()) } doReturn "event_data"
    }
    private val bodyBuilder: EventRequestBodyBuilder = mock {
        on { buildFor(any()) } doReturn Single.just(TypedJsonString.buildRequestBody("[ { event } ]"))
    }
    val event = Event(EventName.OPEN_APP)

    private val database by lazy {
        Room.inMemoryDatabaseBuilder(
                RuntimeEnvironment.systemContext,
                Database::class.java
        ).allowMainThreadQueries().build()
    }

    private val repository by lazy { database.getRepo() }

    val fixture by lazy {
        EventSenderService(
                repository = repository,
                api = api,
                serializer = serializer,
                requestBodyBuilder = bodyBuilder,
        )
    }

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + fixture.job

    @Test
    fun eventBundling_success() = runBlocking {

        (1..40).map { Event(EventName.OPEN_APP) }.forEach { fixture.postEvent(it) }

        fixture.job.children.forEach { it.join() }

        verify(api).postEvents(any())
        assertEquals(15, eventDao.getTrackingEventCount())
    }
}

按照@Marko Topolnik 的建议更新代码后 - 添加 fixture.job.children.forEach { it.join() } 测试永远不会完成。

你做错的一件事与此有关:

override fun postEvent(event: Event) {
    launch(Dispatchers.IO) {
        writeEventToDatabase(event)
    }
}

postEvent 启动一个即发即弃的异步作业,最终会将事件写入数据库。您的测试快速连续创建 40 个这样的作业,并在它们排队时断言预期状态。不过,我不明白为什么你在发布 40 个事件后断言 15 个事件。

要解决此问题,您应该使用已有的线路:

fixture.job.join()

但将其更改为

fixture.job.children.forEach { it.join() }

并将其放在较低的位置,在创建事件的循环之后。


我没有考虑到您在 init 块中启动的 long-运行 消费者作业。这使我在上面给出的加入 master 作业的所有子项的建议无效。

相反,您将不得不进行更多更改。将 postEvent return 作为它启动的作业,并在测试中收集所有这些作业并加入它们。这样更有选择性,避免加入长寿工作。


作为一个单独的问题,您的批处理方法并不理想,因为它总是会在执行任何操作之前等待完整的批处理。每当有没有事件的平静期时,事件将无限期地坐在不完整的批次中。

最好的方法是自然批处理,在这种方法中,您会急切地耗尽输入队列。当传入的事件大量涌入时,批次自然会增长,当它们滴入时,它们仍会立即得到服务。可以看到基本思路here.