协程的长寿服务
Long living service with coroutines
我想创建一个可以处理事件的长期服务。
它通过 postEvent
接收事件,将其存储在存储库(带有底层数据库)中,并在有足够的事件时发送一批 api。
我还想按需关闭它。
此外,我想测试这项服务。
这就是我到目前为止的想法。目前我正在努力进行单元测试。
要么在事件通过 fixture.postEvent()
发送到服务后过早关闭数据库,要么测试本身陷入某种死锁(正在尝试各种上下文 + 作业配置)。
我做错了什么?
class EventSenderService(
private val repository: EventRepository,
private val api: Api,
private val serializer: GsonSerializer,
private val requestBodyBuilder: EventRequestBodyBuilder,
) : EventSender, CoroutineScope {
private val eventBatchSize = 25
val job = Job()
private val channel = Channel<Unit>()
init {
job.start()
launch {
for (event in channel) {
val trackingEventCount = repository.getTrackingEventCount()
if (trackingEventCount < eventBatchSize) continue
readSendDelete()
}
}
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
override fun postEvent(event: Event) {
launch(Dispatchers.IO) {
writeEventToDatabase(event)
}
}
override fun close() {
channel.close()
job.cancel()
}
private fun readSendDelete() {
try {
val events = repository.getTrackingEvents(eventBatchSize)
val request = requestBodyBuilder.buildFor(events).blockingGet()
api.postEvents(request).blockingGet()
repository.deleteTrackingEvents(events)
} catch (throwable: Throwable) {
Log.e(throwable)
}
}
private suspend fun writeEventToDatabase(event: Event) {
try {
val trackingEvent = TrackingEvent(eventData = serializer.toJson(event))
repository.insert(trackingEvent)
channel.send(Unit)
} catch (throwable: Throwable) {
throwable.printStackTrace()
Log.e(throwable)
}
}
}
测试
@RunWith(RobolectricTestRunner::class)
class EventSenderServiceTest : CoroutineScope {
@Rule
@JvmField
val instantExecutorRule = InstantTaskExecutorRule()
private val api: Api = mock {
on { postEvents(any()) } doReturn Single.just(BaseResponse())
}
private val serializer: GsonSerializer = mock {
on { toJson<Any>(any()) } doReturn "event_data"
}
private val bodyBuilder: EventRequestBodyBuilder = mock {
on { buildFor(any()) } doReturn Single.just(TypedJsonString.buildRequestBody("[ { event } ]"))
}
val event = Event(EventName.OPEN_APP)
private val database by lazy {
Room.inMemoryDatabaseBuilder(
RuntimeEnvironment.systemContext,
Database::class.java
).allowMainThreadQueries().build()
}
private val repository by lazy { database.getRepo() }
val fixture by lazy {
EventSenderService(
repository = repository,
api = api,
serializer = serializer,
requestBodyBuilder = bodyBuilder,
)
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + fixture.job
@Test
fun eventBundling_success() = runBlocking {
(1..40).map { Event(EventName.OPEN_APP) }.forEach { fixture.postEvent(it) }
fixture.job.children.forEach { it.join() }
verify(api).postEvents(any())
assertEquals(15, eventDao.getTrackingEventCount())
}
}
按照@Marko Topolnik 的建议更新代码后 - 添加 fixture.job.children.forEach { it.join() }
测试永远不会完成。
你做错的一件事与此有关:
override fun postEvent(event: Event) {
launch(Dispatchers.IO) {
writeEventToDatabase(event)
}
}
postEvent
启动一个即发即弃的异步作业,最终会将事件写入数据库。您的测试快速连续创建 40 个这样的作业,并在它们排队时断言预期状态。不过,我不明白为什么你在发布 40 个事件后断言 15 个事件。
要解决此问题,您应该使用已有的线路:
fixture.job.join()
但将其更改为
fixture.job.children.forEach { it.join() }
并将其放在较低的位置,在创建事件的循环之后。
我没有考虑到您在 init
块中启动的 long-运行 消费者作业。这使我在上面给出的加入 master 作业的所有子项的建议无效。
相反,您将不得不进行更多更改。将 postEvent
return 作为它启动的作业,并在测试中收集所有这些作业并加入它们。这样更有选择性,避免加入长寿工作。
作为一个单独的问题,您的批处理方法并不理想,因为它总是会在执行任何操作之前等待完整的批处理。每当有没有事件的平静期时,事件将无限期地坐在不完整的批次中。
最好的方法是自然批处理,在这种方法中,您会急切地耗尽输入队列。当传入的事件大量涌入时,批次自然会增长,当它们滴入时,它们仍会立即得到服务。可以看到基本思路here.
我想创建一个可以处理事件的长期服务。
它通过 postEvent
接收事件,将其存储在存储库(带有底层数据库)中,并在有足够的事件时发送一批 api。
我还想按需关闭它。 此外,我想测试这项服务。
这就是我到目前为止的想法。目前我正在努力进行单元测试。
要么在事件通过 fixture.postEvent()
发送到服务后过早关闭数据库,要么测试本身陷入某种死锁(正在尝试各种上下文 + 作业配置)。
我做错了什么?
class EventSenderService(
private val repository: EventRepository,
private val api: Api,
private val serializer: GsonSerializer,
private val requestBodyBuilder: EventRequestBodyBuilder,
) : EventSender, CoroutineScope {
private val eventBatchSize = 25
val job = Job()
private val channel = Channel<Unit>()
init {
job.start()
launch {
for (event in channel) {
val trackingEventCount = repository.getTrackingEventCount()
if (trackingEventCount < eventBatchSize) continue
readSendDelete()
}
}
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
override fun postEvent(event: Event) {
launch(Dispatchers.IO) {
writeEventToDatabase(event)
}
}
override fun close() {
channel.close()
job.cancel()
}
private fun readSendDelete() {
try {
val events = repository.getTrackingEvents(eventBatchSize)
val request = requestBodyBuilder.buildFor(events).blockingGet()
api.postEvents(request).blockingGet()
repository.deleteTrackingEvents(events)
} catch (throwable: Throwable) {
Log.e(throwable)
}
}
private suspend fun writeEventToDatabase(event: Event) {
try {
val trackingEvent = TrackingEvent(eventData = serializer.toJson(event))
repository.insert(trackingEvent)
channel.send(Unit)
} catch (throwable: Throwable) {
throwable.printStackTrace()
Log.e(throwable)
}
}
}
测试
@RunWith(RobolectricTestRunner::class)
class EventSenderServiceTest : CoroutineScope {
@Rule
@JvmField
val instantExecutorRule = InstantTaskExecutorRule()
private val api: Api = mock {
on { postEvents(any()) } doReturn Single.just(BaseResponse())
}
private val serializer: GsonSerializer = mock {
on { toJson<Any>(any()) } doReturn "event_data"
}
private val bodyBuilder: EventRequestBodyBuilder = mock {
on { buildFor(any()) } doReturn Single.just(TypedJsonString.buildRequestBody("[ { event } ]"))
}
val event = Event(EventName.OPEN_APP)
private val database by lazy {
Room.inMemoryDatabaseBuilder(
RuntimeEnvironment.systemContext,
Database::class.java
).allowMainThreadQueries().build()
}
private val repository by lazy { database.getRepo() }
val fixture by lazy {
EventSenderService(
repository = repository,
api = api,
serializer = serializer,
requestBodyBuilder = bodyBuilder,
)
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + fixture.job
@Test
fun eventBundling_success() = runBlocking {
(1..40).map { Event(EventName.OPEN_APP) }.forEach { fixture.postEvent(it) }
fixture.job.children.forEach { it.join() }
verify(api).postEvents(any())
assertEquals(15, eventDao.getTrackingEventCount())
}
}
按照@Marko Topolnik 的建议更新代码后 - 添加 fixture.job.children.forEach { it.join() }
测试永远不会完成。
你做错的一件事与此有关:
override fun postEvent(event: Event) {
launch(Dispatchers.IO) {
writeEventToDatabase(event)
}
}
postEvent
启动一个即发即弃的异步作业,最终会将事件写入数据库。您的测试快速连续创建 40 个这样的作业,并在它们排队时断言预期状态。不过,我不明白为什么你在发布 40 个事件后断言 15 个事件。
要解决此问题,您应该使用已有的线路:
fixture.job.join()
但将其更改为
fixture.job.children.forEach { it.join() }
并将其放在较低的位置,在创建事件的循环之后。
我没有考虑到您在 init
块中启动的 long-运行 消费者作业。这使我在上面给出的加入 master 作业的所有子项的建议无效。
相反,您将不得不进行更多更改。将 postEvent
return 作为它启动的作业,并在测试中收集所有这些作业并加入它们。这样更有选择性,避免加入长寿工作。
作为一个单独的问题,您的批处理方法并不理想,因为它总是会在执行任何操作之前等待完整的批处理。每当有没有事件的平静期时,事件将无限期地坐在不完整的批次中。
最好的方法是自然批处理,在这种方法中,您会急切地耗尽输入队列。当传入的事件大量涌入时,批次自然会增长,当它们滴入时,它们仍会立即得到服务。可以看到基本思路here.