如何在不需要时启动可以 "switched off" 的多个流?
How to start a multiple flows that can be "switched off" when not needed?
场景
List<Message>
在可组合项中
messages = mutableStateListOf<Message>
视图模型
messages
取决于两个动作 getAll
或 search
- 在这两种情况下我们都观察到 SQLite,但是 在一个瞬间只有一个流应该处于活动状态(用户正在查看全部或正在搜索)
- Flow 与 SQLite 一起使用,因为新消息可以来自网络。
问题
- 如何在
getAll
和search
流之间切换
- 当进行新的搜索时,应取消旧的
search
流程并开始新的流程。如何做到这一点?
当前实施有两个流程
@Composable
fun ListOfMessages() {
val messages = viewModel.messages
}
// -----------------------------
// ViewModel
// -----------------------------
@ExperimentalCoroutinesApi
class MessageViewModel(application: Application) {
val messages = mutableStateListOf<Message>()
val isLoading = mutableStateOf(false)
init {
fetchMessages()
}
fun fetchMessages() {
MessageUseCase(messagesDB).getAll().onEach { dataState ->
isLoading.value = dataState.loading
dataState.data?.let { data ->
messages.clear()
messages.addAll(data)
}
dataState.error?.let { error ->
// UIState -> error message
}
}.launchIn(viewModelScope)
}
fun SearchWithInMessage(q: String) {
MessageUseCase(messagesDB).search(q).onEach { dataState ->
isLoading.value = dataState.loading
dataState.data?.let { data ->
messages.clear()
messages.addAll(data)
}
dataState.error?.let { error ->
// UIState -> error message
}
}
}
}
// -----------------------------
// Use Cases
// -----------------------------
class MessageUseCase(messagesDB: messageDao) {
@ExperimentalCoroutinesApi
fun getAll(): Flow<DataState<List<Message>>> = channelFlow { {
send(DataState.loading())
try {
fetchAndSaveLatestMessagesFromRemote()
val messages = messagesDB.getAllStream()
messages.collectLatest { list ->
// business logic
send(DataState.success(list))
}
} catch (e: Exception){
send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
}
}
@ExperimentalCoroutinesApi
fun search(q: String): Flow<DataState<List<Message>>> = channelFlow { {
send(DataState.loading())
try {
fetchAndSaveSearchedMessageFromRemote(q)
val messages = messagesDB.searchStream(q)
messages.collectLatest { list ->
// business logic
send(DataState.success(list))
}
} catch (e: Exception){
send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
}
}
}
// -----------------------------
// DB and Network calls
// -----------------------------
您可以使用 flatMapLatest()。它接收流的流并始终复制发送给它的最新流的项目,取消以前的流。因此,它作为可以在其他流之间切换的流。
根据您的 getAll()
和 search()
的声明方式,它可能是这样的:
private val state = MutableStateFlow<String?>(null)
private val messages = state.flatMapLatest {
if (it == null) getAll() else search(it)
}
suspend fun requestSearch(search: String) = state.emit(search)
suspend fun requestAll() = state.emit(null)
fun getAll(): Flow<List<Message>> = TODO()
fun search(search: String): Flow<List<Message>> = TODO()
state
代表我们目前需要的搜索状态。每当我们向它发出信号时,它都会请求 getAll()
或 search()
流并将其复制为 messages
流。
然后,要将其与 Compose 一起使用,您需要将其转换为 State
:
messages.collectAsState(emptyList())
场景
List<Message>
在可组合项中messages = mutableStateListOf<Message>
视图模型messages
取决于两个动作getAll
或search
- 在这两种情况下我们都观察到 SQLite,但是 在一个瞬间只有一个流应该处于活动状态(用户正在查看全部或正在搜索)
- Flow 与 SQLite 一起使用,因为新消息可以来自网络。
问题
- 如何在
getAll
和search
流之间切换 - 当进行新的搜索时,应取消旧的
search
流程并开始新的流程。如何做到这一点?
当前实施有两个流程
@Composable
fun ListOfMessages() {
val messages = viewModel.messages
}
// -----------------------------
// ViewModel
// -----------------------------
@ExperimentalCoroutinesApi
class MessageViewModel(application: Application) {
val messages = mutableStateListOf<Message>()
val isLoading = mutableStateOf(false)
init {
fetchMessages()
}
fun fetchMessages() {
MessageUseCase(messagesDB).getAll().onEach { dataState ->
isLoading.value = dataState.loading
dataState.data?.let { data ->
messages.clear()
messages.addAll(data)
}
dataState.error?.let { error ->
// UIState -> error message
}
}.launchIn(viewModelScope)
}
fun SearchWithInMessage(q: String) {
MessageUseCase(messagesDB).search(q).onEach { dataState ->
isLoading.value = dataState.loading
dataState.data?.let { data ->
messages.clear()
messages.addAll(data)
}
dataState.error?.let { error ->
// UIState -> error message
}
}
}
}
// -----------------------------
// Use Cases
// -----------------------------
class MessageUseCase(messagesDB: messageDao) {
@ExperimentalCoroutinesApi
fun getAll(): Flow<DataState<List<Message>>> = channelFlow { {
send(DataState.loading())
try {
fetchAndSaveLatestMessagesFromRemote()
val messages = messagesDB.getAllStream()
messages.collectLatest { list ->
// business logic
send(DataState.success(list))
}
} catch (e: Exception){
send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
}
}
@ExperimentalCoroutinesApi
fun search(q: String): Flow<DataState<List<Message>>> = channelFlow { {
send(DataState.loading())
try {
fetchAndSaveSearchedMessageFromRemote(q)
val messages = messagesDB.searchStream(q)
messages.collectLatest { list ->
// business logic
send(DataState.success(list))
}
} catch (e: Exception){
send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
}
}
}
// -----------------------------
// DB and Network calls
// -----------------------------
您可以使用 flatMapLatest()。它接收流的流并始终复制发送给它的最新流的项目,取消以前的流。因此,它作为可以在其他流之间切换的流。
根据您的 getAll()
和 search()
的声明方式,它可能是这样的:
private val state = MutableStateFlow<String?>(null)
private val messages = state.flatMapLatest {
if (it == null) getAll() else search(it)
}
suspend fun requestSearch(search: String) = state.emit(search)
suspend fun requestAll() = state.emit(null)
fun getAll(): Flow<List<Message>> = TODO()
fun search(search: String): Flow<List<Message>> = TODO()
state
代表我们目前需要的搜索状态。每当我们向它发出信号时,它都会请求 getAll()
或 search()
流并将其复制为 messages
流。
然后,要将其与 Compose 一起使用,您需要将其转换为 State
:
messages.collectAsState(emptyList())