Kotlin Flow 和 Websockets 在 Android 上具有干净的架构

Kotlin Flow and Websockets with clean architecture on Android

最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道没有它怎么办。所以,我们尝试了著名的 Kotlin Flow 但我不知道我们的实现是否正确。

我们应用的架构分为四层:

因此,我们监听接收到服务中的事件如下:

fun listenMessages(): Flow<List<Message>> = channelFlow {
    socket.on("NewMessage") { args ->
        val message = gson.fromJson(args[0].toString(), ...)
        trySend(message)
    }
    awaitClose()
}

当使用 trySend 接收到事件时,我们使用 channelFlow 的协程发送给消费者,并且我们使用 awaitClose 使这个 Flow 保持活动状态。

存储库在捕获 Flow 后执行一些逻辑并将其发送回 ViewModel:

fun getMessages(): Flow<List<Message>> {
    return service.listenMessages()
            .filter { ... }
            .map { ... }
}

然后,ViewModel 在收集 Flow:

时启动协程并更新 LiveData
fun getMessages() {
    viewModelScope.launch(context = Dispatcher.IO) {
        repository.getMessages()
                .collect {
                    messagesLiveData.postValue(it)
                }
    }
}

这很好用,但是这会引发一些问题:

提前感谢您的建议。

通道的概念是作为主要在协同程序之间进行通信的原语 1. As far as I understand it based on the channelFlow docs,它在底层使用 Channel 并将其转换为 Flow。通过在引擎盖下使用 Channel,有一些重要的事情需要意识到:

every value that is sent to the channel is received once. You cannot use channels to distribute events or state updates in a way that allows multiple subscribers to independently receive and react upon them. 1

根据您的体系结构,这可能无关紧要,但这个示例可能会说明一些重要的事情:

fun ws(): Flow<List<Message>> = channelFlow {
    println("channelFlow block called")
    trySend(listOf(Message(0), Message(1)))
    delay(2_000)
    trySend(listOf(Message(2)))
}

fun main() = runBlocking {
    val source = ws()

    launch {
        source.collect {
            println("First collect got $it")
        }
    }
    launch {
        source.collect {
            println("Second collect got $it")
        }
    }
}

生成输出:

channelFlow block called
channelFlow block called
First collect got [Message(id=0), Message(id=1)]
Second collect got [Message(id=0), Message(id=1)]
First collect got [Message(id=2)]
Second collect got [Message(id=2)]

由于Channel无法共享,每次.collectsource上被调用,都会触发channelFlow主体被调用!更令人惊讶的是,通过 source 引用了相同的 Flow,并没有对 ws() 进行新的调用。

仔细查看 channelFlow 文档,您会发现

The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.2

由于 .collect() 是终端操作员,它会触发对 channelFlow 正文的新调用。

现在,这对您的用例重要吗?我不确定;您将必须弄清楚您是否在多个位置订阅了 listenMessages 产生的流,或者 channelFlow 的正文执行多次的成本是否很高。

作为更一般的建议,我建议更明确地说明您的服务行为。 listenMessages 应该发送给所有订阅的人吗?它是否应该只发送给第一个可用的(另请参阅 fan-out behavior)? If you prefer the former, a SharedFlow 将被推荐,如果稍后我会明确地直接公开 Channel 这样你就不会混淆下游消费者关于这个问题我在上面展示了。