如何正确使用 Flow.onStart {} 重新获取缓存内容?
How do I properly use Flow.onStart {} to re-fetch cached content?
我有一个获取 Something 的方法,为简单起见,我们将其设为 String。该方法应该 return 一个流,它最初发出缓存的字符串,然后在查询我的 API.
后发出“新鲜”值
谢天谢地,每当给定 table 更新时,Room 都会发出新数据,因此这部分逻辑开箱即用。我也可以使用 refreshing/re-fetching。但是,当我尝试使用 .onStart{}
(恕我直言,它看起来更简洁一些)时,功能和我的理解都会分崩离析:/
这是一个概念证明,应该 运行 在 IntelliJ 或 Android Studio 中,没有太多不寻常的依赖项:
// Room automatically emits new values on dbFlow when the relevant table is updated
val dbFlow = MutableStateFlow("cachedValue")
// refresh simulates fetchSomethingFromApi().also { someDao.updateData(it) }
val refresh = suspend {
delay(1000) // simulate API delay
stream.value = "freshValueFromAPI"
}
suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
coroutineScope {
launch {
refresh()
}
}
}
suspend fun thisWorks(): Flow<String> = flow {
coroutineScope {
launch {
refresh()
}
dbFlow.collect {
emit(it)
}
}
}
如何测试:
runBlocking {
thisWorks().take(2).collect {
println(it)
}
}
或:
runBlocking {
doesNotWork().take(2).collect {
println(it)
}
}
我希望两者产生相同的结果,但是 具有 .onStart {}
的那个从不发出缓存值,所以 .take(2)
最终超时(因为它只发出一次)。
这是怎么回事?
出现这种行为的原因是
a) onStart { ... }
在流量采集前执行
在一个简单的例子中:
flow {
emit("foo")
}.onStart {
println("bar")
}.collect {
println(it)
}
产生
bar
foo
和 b) coroutineScope {...}
等待直到块内启动的所有子协程都完成
另一个例子:
suspend fun foo() {
coroutineScope {
launch {
delay(1000)
}
}
}
调用此函数需要约 1000 毫秒,因为 coroutineScope
将等待内部子协程完成
现在举个例子
suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
coroutineScope {
launch {
refresh()
}
}
}
根据 b),这与
具有相同的行为
suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
refresh()
}
由于onStart{...}
是在流量收集之前执行的,所以这个和写
是一样的
suspend fun doesNotWork(): Flow<String> = flow {
refresh()
// could be simplified to emitAll(dbFlow)
dbFlow.collect {
emit(it)
}
}
现在您看到这与您的工作示例有何不同。
您首先从 api 刷新,然后开始从数据库发出值。
当您的工作示例启动一个新协程时,它从您的 api 异步刷新并立即开始从您的数据库发出值。
我有一个获取 Something 的方法,为简单起见,我们将其设为 String。该方法应该 return 一个流,它最初发出缓存的字符串,然后在查询我的 API.
后发出“新鲜”值谢天谢地,每当给定 table 更新时,Room 都会发出新数据,因此这部分逻辑开箱即用。我也可以使用 refreshing/re-fetching。但是,当我尝试使用 .onStart{}
(恕我直言,它看起来更简洁一些)时,功能和我的理解都会分崩离析:/
这是一个概念证明,应该 运行 在 IntelliJ 或 Android Studio 中,没有太多不寻常的依赖项:
// Room automatically emits new values on dbFlow when the relevant table is updated
val dbFlow = MutableStateFlow("cachedValue")
// refresh simulates fetchSomethingFromApi().also { someDao.updateData(it) }
val refresh = suspend {
delay(1000) // simulate API delay
stream.value = "freshValueFromAPI"
}
suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
coroutineScope {
launch {
refresh()
}
}
}
suspend fun thisWorks(): Flow<String> = flow {
coroutineScope {
launch {
refresh()
}
dbFlow.collect {
emit(it)
}
}
}
如何测试:
runBlocking {
thisWorks().take(2).collect {
println(it)
}
}
或:
runBlocking {
doesNotWork().take(2).collect {
println(it)
}
}
我希望两者产生相同的结果,但是 具有 .onStart {}
的那个从不发出缓存值,所以 .take(2)
最终超时(因为它只发出一次)。
这是怎么回事?
出现这种行为的原因是
a) onStart { ... }
在流量采集前执行
在一个简单的例子中:
flow {
emit("foo")
}.onStart {
println("bar")
}.collect {
println(it)
}
产生
bar
foo
和 b) coroutineScope {...}
等待直到块内启动的所有子协程都完成
另一个例子:
suspend fun foo() {
coroutineScope {
launch {
delay(1000)
}
}
}
调用此函数需要约 1000 毫秒,因为 coroutineScope
将等待内部子协程完成
现在举个例子
suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
coroutineScope {
launch {
refresh()
}
}
}
根据 b),这与
具有相同的行为suspend fun doesNotWork(): Flow<String> = dbFlow
.onStart {
refresh()
}
由于onStart{...}
是在流量收集之前执行的,所以这个和写
suspend fun doesNotWork(): Flow<String> = flow {
refresh()
// could be simplified to emitAll(dbFlow)
dbFlow.collect {
emit(it)
}
}
现在您看到这与您的工作示例有何不同。 您首先从 api 刷新,然后开始从数据库发出值。 当您的工作示例启动一个新协程时,它从您的 api 异步刷新并立即开始从您的数据库发出值。