如何正确使用 Flow.onStart {} 重新获取缓存内容?

How do I properly use Flow.onStart {} to re-fetch cached content?

我有一个获取 Something 的方法,为简单起见,我们将其设为 String。该方法应该 return 一个流,它最初发出缓存的字符串,然后在查询我的 API.

后发出“新鲜”值

谢天谢地,每当给定 table 更新时,Room 都会发出新数据,因此这部分逻辑开箱即用。我也可以使用 refreshing/re-fetching。但是,当我尝试使用 .onStart{}(恕我直言,它看起来更简洁一些)时,功能和我的理解都会分崩离析:/

这是一个概念证明,应该 运行 在 IntelliJ 或 Android Studio 中,没有太多不寻常的依赖项:

// Room automatically emits new values on dbFlow when the relevant table is updated
val dbFlow = MutableStateFlow("cachedValue")

// refresh simulates fetchSomethingFromApi().also { someDao.updateData(it) }
val refresh = suspend {
    delay(1000) // simulate API delay
    stream.value = "freshValueFromAPI"
}

suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        coroutineScope {
            launch {
                refresh()
            }
        }
    }

suspend fun thisWorks(): Flow<String> = flow {
    coroutineScope {
        launch {
            refresh()
        }
        dbFlow.collect {
            emit(it)
        }
    }
}


如何测试:

runBlocking {
    thisWorks().take(2).collect {
        println(it)
    }
}

或:

runBlocking {
    doesNotWork().take(2).collect {
        println(it)
    }
}

我希望两者产生相同的结果,但是 具有 .onStart {} 的那个从不发出缓存值,所以 .take(2) 最终超时(因为它只发出一次)。

这是怎么回事?

出现这种行为的原因是
a) onStart { ... }在流量采集前执行
在一个简单的例子中:

flow {
    emit("foo")  
}.onStart {
    println("bar")
}.collect {
    println(it)
}

产生

bar
foo

和 b) coroutineScope {...} 等待直到块内启动的所有子协程都完成
另一个例子:

suspend fun foo() {
    coroutineScope {
        launch {
            delay(1000)
        }
    }
}

调用此函数需要约 1000 毫秒,因为 coroutineScope 将等待内部子协程完成

现在举个例子

suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        coroutineScope {
            launch {
                refresh()
            }
        }
    }

根据 b),这与

具有相同的行为
suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        refresh()
    }

由于onStart{...}是在流量收集之前执行的,所以这个和写

是一样的
suspend fun doesNotWork(): Flow<String> = flow {
    refresh()
    // could be simplified  to emitAll(dbFlow)
    dbFlow.collect {
        emit(it)
    }
}

现在您看到这与您的工作示例有何不同。 您首先从 api 刷新,然后开始从数据库发出值。 当您的工作示例启动一个新协程时,它从您的 api 异步刷新并立即开始从您的数据库发出值。