Kotlin Flow 和 Websockets 在 Android 上具有干净的架构
Kotlin Flow and Websockets with clean architecture on Android
最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道没有它怎么办。所以,我们尝试了著名的 Kotlin Flow
但我不知道我们的实现是否正确。
我们应用的架构分为四层:
- 服务 - 从套接字发出和接收事件,
- 存储库 - 过滤器、映射、转换等,
- ViewModel - 填充 LiveData
- Activity - 观察变化并更新UI.
因此,我们监听接收到服务中的事件如下:
fun listenMessages(): Flow<List<Message>> = channelFlow {
socket.on("NewMessage") { args ->
val message = gson.fromJson(args[0].toString(), ...)
trySend(message)
}
awaitClose()
}
当使用 trySend
接收到事件时,我们使用 channelFlow
的协程发送给消费者,并且我们使用 awaitClose
使这个 Flow
保持活动状态。
存储库在捕获 Flow
后执行一些逻辑并将其发送回 ViewModel:
fun getMessages(): Flow<List<Message>> {
return service.listenMessages()
.filter { ... }
.map { ... }
}
然后,ViewModel 在收集 Flow
:
时启动协程并更新 LiveData
fun getMessages() {
viewModelScope.launch(context = Dispatcher.IO) {
repository.getMessages()
.collect {
messagesLiveData.postValue(it)
}
}
}
这很好用,但是这会引发一些问题:
- 这是正确的实施方式吗?
- 当我们需要不断倾听时,
channelFlow
是正确的选择吗?
- 在这种情况下,我们是否应该使用经典的
Channel
s 而不是 Flow
(hot vs cold )?
提前感谢您的建议。
通道的概念是作为主要在协同程序之间进行通信的原语 1. As far as I understand it based on the channelFlow
docs,它在底层使用 Channel
并将其转换为 Flow
。通过在引擎盖下使用 Channel
,有一些重要的事情需要意识到:
every value that is sent to the channel is received once. You cannot use channels to distribute events or state updates in a way that allows multiple subscribers to independently receive and react upon them. 1
根据您的体系结构,这可能无关紧要,但这个示例可能会说明一些重要的事情:
fun ws(): Flow<List<Message>> = channelFlow {
println("channelFlow block called")
trySend(listOf(Message(0), Message(1)))
delay(2_000)
trySend(listOf(Message(2)))
}
fun main() = runBlocking {
val source = ws()
launch {
source.collect {
println("First collect got $it")
}
}
launch {
source.collect {
println("Second collect got $it")
}
}
}
生成输出:
channelFlow block called
channelFlow block called
First collect got [Message(id=0), Message(id=1)]
Second collect got [Message(id=0), Message(id=1)]
First collect got [Message(id=2)]
Second collect got [Message(id=2)]
由于Channel
无法共享,每次.collect
在source
上被调用,都会触发channelFlow
主体被调用!更令人惊讶的是,通过 source
引用了相同的 Flow
,并没有对 ws()
进行新的调用。
仔细查看 channelFlow
文档,您会发现
The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.2
由于 .collect()
是终端操作员,它会触发对 channelFlow
正文的新调用。
现在,这对您的用例重要吗?我不确定;您将必须弄清楚您是否在多个位置订阅了 listenMessages
产生的流,或者 channelFlow
的正文执行多次的成本是否很高。
作为更一般的建议,我建议更明确地说明您的服务行为。 listenMessages
应该发送给所有订阅的人吗?它是否应该只发送给第一个可用的(另请参阅 fan-out behavior)? If you prefer the former, a SharedFlow
将被推荐,如果稍后我会明确地直接公开 Channel
这样你就不会混淆下游消费者关于这个问题我在上面展示了。
最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道没有它怎么办。所以,我们尝试了著名的 Kotlin Flow
但我不知道我们的实现是否正确。
我们应用的架构分为四层:
- 服务 - 从套接字发出和接收事件,
- 存储库 - 过滤器、映射、转换等,
- ViewModel - 填充 LiveData
- Activity - 观察变化并更新UI.
因此,我们监听接收到服务中的事件如下:
fun listenMessages(): Flow<List<Message>> = channelFlow {
socket.on("NewMessage") { args ->
val message = gson.fromJson(args[0].toString(), ...)
trySend(message)
}
awaitClose()
}
当使用 trySend
接收到事件时,我们使用 channelFlow
的协程发送给消费者,并且我们使用 awaitClose
使这个 Flow
保持活动状态。
存储库在捕获 Flow
后执行一些逻辑并将其发送回 ViewModel:
fun getMessages(): Flow<List<Message>> {
return service.listenMessages()
.filter { ... }
.map { ... }
}
然后,ViewModel 在收集 Flow
:
fun getMessages() {
viewModelScope.launch(context = Dispatcher.IO) {
repository.getMessages()
.collect {
messagesLiveData.postValue(it)
}
}
}
这很好用,但是这会引发一些问题:
- 这是正确的实施方式吗?
- 当我们需要不断倾听时,
channelFlow
是正确的选择吗? - 在这种情况下,我们是否应该使用经典的
Channel
s 而不是Flow
(hot vs cold )?
提前感谢您的建议。
通道的概念是作为主要在协同程序之间进行通信的原语 1. As far as I understand it based on the channelFlow
docs,它在底层使用 Channel
并将其转换为 Flow
。通过在引擎盖下使用 Channel
,有一些重要的事情需要意识到:
every value that is sent to the channel is received once. You cannot use channels to distribute events or state updates in a way that allows multiple subscribers to independently receive and react upon them. 1
根据您的体系结构,这可能无关紧要,但这个示例可能会说明一些重要的事情:
fun ws(): Flow<List<Message>> = channelFlow {
println("channelFlow block called")
trySend(listOf(Message(0), Message(1)))
delay(2_000)
trySend(listOf(Message(2)))
}
fun main() = runBlocking {
val source = ws()
launch {
source.collect {
println("First collect got $it")
}
}
launch {
source.collect {
println("Second collect got $it")
}
}
}
生成输出:
channelFlow block called
channelFlow block called
First collect got [Message(id=0), Message(id=1)]
Second collect got [Message(id=0), Message(id=1)]
First collect got [Message(id=2)]
Second collect got [Message(id=2)]
由于Channel
无法共享,每次.collect
在source
上被调用,都会触发channelFlow
主体被调用!更令人惊讶的是,通过 source
引用了相同的 Flow
,并没有对 ws()
进行新的调用。
仔细查看 channelFlow
文档,您会发现
The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.2
由于 .collect()
是终端操作员,它会触发对 channelFlow
正文的新调用。
现在,这对您的用例重要吗?我不确定;您将必须弄清楚您是否在多个位置订阅了 listenMessages
产生的流,或者 channelFlow
的正文执行多次的成本是否很高。
作为更一般的建议,我建议更明确地说明您的服务行为。 listenMessages
应该发送给所有订阅的人吗?它是否应该只发送给第一个可用的(另请参阅 fan-out behavior)? If you prefer the former, a SharedFlow
将被推荐,如果稍后我会明确地直接公开 Channel
这样你就不会混淆下游消费者关于这个问题我在上面展示了。