如何在不需要时启动可以 "switched off" 的多个流?

How to start a multiple flows that can be "switched off" when not needed?

场景

  1. List<Message> 在可组合项中
  2. messages = mutableStateListOf<Message> 视图模型
  3. messages 取决于两个动作 getAllsearch
  4. 在这两种情况下我们都观察到 SQLite,但是 在一个瞬间只有一个流应该处于活动状态(用户正在查看全部或正在搜索)
  5. Flow 与 SQLite 一起使用,因为新消息可以来自网络。

问题

  1. 如何在getAllsearch流之间切换
  2. 当进行新的搜索时,应取消旧的 search 流程并开始新的流程。如何做到这一点?

当前实施有两个流程

@Composable
fun ListOfMessages() {
    val messages = viewModel.messages
}

// -----------------------------
// ViewModel
// -----------------------------
@ExperimentalCoroutinesApi
class MessageViewModel(application: Application) {
    val messages = mutableStateListOf<Message>()
    val isLoading = mutableStateOf(false)

    init {
        fetchMessages()
    }

    fun fetchMessages() {
        MessageUseCase(messagesDB).getAll().onEach { dataState ->
            isLoading.value = dataState.loading
            
            dataState.data?.let { data ->
                messages.clear()
                messages.addAll(data)
            }

            dataState.error?.let { error ->
                // UIState -> error message
            }
        }.launchIn(viewModelScope)
    }

    fun SearchWithInMessage(q: String) {
        MessageUseCase(messagesDB).search(q).onEach { dataState ->
            isLoading.value = dataState.loading
            
            dataState.data?.let { data ->
                messages.clear()
                messages.addAll(data)
            }

            dataState.error?.let { error ->
                // UIState -> error message
            }
        }
    }
}

// -----------------------------
// Use Cases
// -----------------------------
class MessageUseCase(messagesDB: messageDao) {
    @ExperimentalCoroutinesApi
    fun getAll(): Flow<DataState<List<Message>>> = channelFlow { {
        send(DataState.loading())

        try {
            fetchAndSaveLatestMessagesFromRemote()
            val messages = messagesDB.getAllStream()
            messages.collectLatest { list ->
                // business logic
                send(DataState.success(list))
            }
        } catch (e: Exception){
            send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
        }
    }

    @ExperimentalCoroutinesApi
    fun search(q: String): Flow<DataState<List<Message>>> = channelFlow { {
        send(DataState.loading())

        try {
            fetchAndSaveSearchedMessageFromRemote(q)
            val messages = messagesDB.searchStream(q)
            messages.collectLatest { list ->
                // business logic
                send(DataState.success(list))
            }
        } catch (e: Exception){
            send(DataState.error<List<Message>>(e.message?: "Unknown Error"))
        }
    }
}

// -----------------------------
// DB and Network calls
// -----------------------------

您可以使用 flatMapLatest()。它接收流的流并始终复制发送给它的最新流的项目,取消以前的流。因此,它作为可以在其他流之间切换的流。

根据您的 getAll()search() 的声明方式,它可能是这样的:

private val state = MutableStateFlow<String?>(null)
private val messages = state.flatMapLatest {
    if (it == null) getAll() else search(it)
}

suspend fun requestSearch(search: String) = state.emit(search)
suspend fun requestAll() = state.emit(null)

fun getAll(): Flow<List<Message>> = TODO()
fun search(search: String): Flow<List<Message>> = TODO()

state代表我们目前需要的搜索状态。每当我们向它发出信号时,它都会请求 getAll()search() 流并将其复制为 messages 流。

然后,要将其与 Compose 一起使用,您需要将其转换为 State:

messages.collectAsState(emptyList())