Kotlin Flow:当 Fragment 变得不可见时取消订阅 SharedFlow
Kotlin Flow: unsubscribe from SharedFlow when Fragment becomes invisible
我读过类似的主题,但找不到合适的答案:
- StateFlow and SharedFlow. Making cold flows hot using shareIn - Android 文档
- Introduce SharedFlow - GH 讨论由 Roman Elizarov 发起
在我的 Repository
class 我感冒了 Flow
我想分享给 2 Presenters
/ViewModels
所以我的选择是使用shareIn
运算符。
让我们看一下 Android 文档的示例:
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope, // e.g. CoroutineScope(Dispatchers.IO)?
replay = 1,
started = SharingStarted.WhileSubscribed()
)
externalScope
参数的文档建议:
A CoroutineScope that is used to share the flow. This scope should live longer than any consumer to keep the shared flow alive as long as needed.
但是,在寻找有关如何停止订阅 Flow
的答案时,第二个 link 中投票最多的答案是:
A solution is not to cancel the flow, but the scope it's launched in.
对我来说,SharedFlow
的这些答案是矛盾的。不幸的是,我的 Presenter
/ViewModel
即使在其 onCleared
被调用后仍然收到最新数据。
如何预防?这是我如何在 Presenter
/ViewModel
:
中使用此 Flow
的示例
fun doSomethingUseful(): Flow<OtherModel> {
return repository.latestNews.map(OtherModel)
如果这可能有帮助,我正在使用 MVI 架构,因此 doSomethingUseful
会对用户创建的一些意图做出反应。
我已尝试提供一个带有相关评论的最小示例。如前所述,SharedFlow 的工作方式与 RxJava 中的 ConnectableObservable
非常相似。上游只会被订阅一次,这意味着计算只对冷的上游流进行一次。您的存储库什么都不做,因为它是一个冷流,在 SharedFlow
订阅之前永远不会“收集”,因此它没有范围。
同时使用了 RxJava 和 Flow,有很多相似之处。创建 Flow
和 Collector
接口似乎几乎没有必要,如果扩展了基本的 Reactive Streams 接口,开发人员可以更轻松地进行转换 - 但我不知道根本原因 -也许他们希望通过新的 api 获得更大的灵活性,或者从 Java 9 implmentation 和 RxJava.
等 Reactive Streams 实现中脱颖而出
class MyViewModel : ViewModel(), CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob() // optional + CoroutineExceptionHandler()
private val latestNews: Flow<List<String>> = doSomethingUseful()
.flowOn(Dispatchers.IO) // upstream will operate on this dispatch
.shareIn(scope = this, // shared in this scope - becomes hot flow (or use viewModelScope) for lifetime of your view model - will only connect to doSomethingUseful once for lifetime of scope
replay = 1,
started = SharingStarted.WhileSubscribed())
fun connect() : Flow<List<String>> = latestNews // expose SharedFlow to "n" number of subscribers or same subscriber more than once
override fun onCleared() {
super.onCleared()
cancel() // cancel the shared flow - this scope is finished
}
}
class MainActivity : AppCompatActivity(), CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob()
private var job : Job? = null
// supply the same view model instance on config changes for example - its scope is larger
private val vm : MyViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
override fun onStart() {
super.onStart()
job = launch {
vm.connect().collect {
// observe latest emission of hot flow and subsequent emissions if any - either reconnect or connect for first time
}
}
}
override fun onStop() {
super.onStop()
// cancel the job but latest news is still "alive" and receives emissions as it is running in a larger scope of this scope
job?.cancel()
}
override fun onDestroy() {
super.onDestroy()
// completely cancel this scope - the ViewModel scope is unaffected
cancel()
}
}
感谢 Mark Keen 的评论和 post 我想我得到了一个令人满意的结果。
我知道 shareIn
参数中定义的范围不必与我的消费者操作的范围相同。将 BasePresenter
/BaseViewModel
中的范围从 CoroutineScope
更改为 viewModelScope
似乎可以解决主要问题。您甚至不需要手动取消此范围,如 Android docs:
中所定义
init {
viewModelScope.launch {
// Coroutine that will be canceled when the ViewModel is cleared.
}
}
请记住,默认的 viewModelScope
调度程序是 Main
,这并不明显,它可能不是您想要的!要更改调度程序,请使用 viewModelScope.launch(YourDispatcher)
.
此外,我的热 SharedFlow
是从另一个冷 Flow
转换而来的 callbackFlow
回调 API (基于 Channels
API - 这很复杂...)
将收集范围更改为 viewModelScope
后,我在从 API 发出新数据时遇到 ChildCancelledException: Child of the scoped flow was cancelled
异常。这个问题在 GitHub:
的两个问题中都有详细记录
- kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
- SendChannel.offer should never throw
如前所述,使用 offer
和 send
的排放之间存在细微差别:
offer is for non-suspending context, while send is for suspending ones.
offer is, unfortunately, non-symmetric to send in terms of propagated exceptions (CancellationException from send is usually ignored, while CancellationException from offer in nom-suspending context is not).
We hope to fix it in #974 either with offerOrClosed or changing offer semantics
至于 Kotlin Coroutines of 1.4.2,#974 还没有修复 - 我希望它能在不久的将来避免意外 CancellationException
.
最后,我建议在 shareIn
运算符中使用 started
参数。在所有这些更改之后,我不得不在我的用例中从 WhileSubscribed()
更改为 Lazily
。
如果我找到任何新信息,我会更新此 post。希望我的研究能节省一些人的时间。
使用共享流。在下面的示例中,我从一个片段中发出值并将其收集到另一个片段中。
视图模型:
class MenuOptionsViewModel : ViewModel() {
private val _option = MutableSharedFlow<String>()
val option = _option.asSharedFlow()
suspend fun setOption(o : String){
_option.emit(o)
}
}
片段发射值:
class BottomSheetOptionsFragment : BottomSheetDialogFragment() , KodeinAware{
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
menuViewModel = activity?.run {
ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
} ?: throw Exception("Invalid Activity")
listViewOptions.adapter = ArrayAdapter<String>(
requireContext(),
R.layout.menu_text_item,
options
)
listViewOptions.setOnItemClickListener { adapterView, view, i, l ->
val entry: String = listViewOptions.getAdapter().getItem(i) as String
// here we are emitting values
GlobalScope.launch { menuViewModel.setOption(entry) }
Log.d(TAG, "emitting flow $entry")
dismiss()
}
}
}
片段收集值:
class DetailFragment : BaseFragment(), View.OnClickListener, KodeinAware,
OnItemClickListener {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
menuViewModel = activity?.run {
ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
} ?: throw Exception("Invalid Activity")
// collecting values
lifecycleScope.launchWhenStarted {
menuViewModel.option.collect {
Log.d(TAG, "collecting flow $it")
}
}
}
我读过类似的主题,但找不到合适的答案:
- StateFlow and SharedFlow. Making cold flows hot using shareIn - Android 文档
- Introduce SharedFlow - GH 讨论由 Roman Elizarov 发起
在我的 Repository
class 我感冒了 Flow
我想分享给 2 Presenters
/ViewModels
所以我的选择是使用shareIn
运算符。
让我们看一下 Android 文档的示例:
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope, // e.g. CoroutineScope(Dispatchers.IO)?
replay = 1,
started = SharingStarted.WhileSubscribed()
)
externalScope
参数的文档建议:
A CoroutineScope that is used to share the flow. This scope should live longer than any consumer to keep the shared flow alive as long as needed.
但是,在寻找有关如何停止订阅 Flow
的答案时,第二个 link 中投票最多的答案是:
A solution is not to cancel the flow, but the scope it's launched in.
对我来说,SharedFlow
的这些答案是矛盾的。不幸的是,我的 Presenter
/ViewModel
即使在其 onCleared
被调用后仍然收到最新数据。
如何预防?这是我如何在 Presenter
/ViewModel
:
Flow
的示例
fun doSomethingUseful(): Flow<OtherModel> {
return repository.latestNews.map(OtherModel)
如果这可能有帮助,我正在使用 MVI 架构,因此 doSomethingUseful
会对用户创建的一些意图做出反应。
我已尝试提供一个带有相关评论的最小示例。如前所述,SharedFlow 的工作方式与 RxJava 中的 ConnectableObservable
非常相似。上游只会被订阅一次,这意味着计算只对冷的上游流进行一次。您的存储库什么都不做,因为它是一个冷流,在 SharedFlow
订阅之前永远不会“收集”,因此它没有范围。
同时使用了 RxJava 和 Flow,有很多相似之处。创建 Flow
和 Collector
接口似乎几乎没有必要,如果扩展了基本的 Reactive Streams 接口,开发人员可以更轻松地进行转换 - 但我不知道根本原因 -也许他们希望通过新的 api 获得更大的灵活性,或者从 Java 9 implmentation 和 RxJava.
class MyViewModel : ViewModel(), CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob() // optional + CoroutineExceptionHandler()
private val latestNews: Flow<List<String>> = doSomethingUseful()
.flowOn(Dispatchers.IO) // upstream will operate on this dispatch
.shareIn(scope = this, // shared in this scope - becomes hot flow (or use viewModelScope) for lifetime of your view model - will only connect to doSomethingUseful once for lifetime of scope
replay = 1,
started = SharingStarted.WhileSubscribed())
fun connect() : Flow<List<String>> = latestNews // expose SharedFlow to "n" number of subscribers or same subscriber more than once
override fun onCleared() {
super.onCleared()
cancel() // cancel the shared flow - this scope is finished
}
}
class MainActivity : AppCompatActivity(), CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob()
private var job : Job? = null
// supply the same view model instance on config changes for example - its scope is larger
private val vm : MyViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
override fun onStart() {
super.onStart()
job = launch {
vm.connect().collect {
// observe latest emission of hot flow and subsequent emissions if any - either reconnect or connect for first time
}
}
}
override fun onStop() {
super.onStop()
// cancel the job but latest news is still "alive" and receives emissions as it is running in a larger scope of this scope
job?.cancel()
}
override fun onDestroy() {
super.onDestroy()
// completely cancel this scope - the ViewModel scope is unaffected
cancel()
}
}
感谢 Mark Keen 的评论和 post 我想我得到了一个令人满意的结果。
我知道 shareIn
参数中定义的范围不必与我的消费者操作的范围相同。将 BasePresenter
/BaseViewModel
中的范围从 CoroutineScope
更改为 viewModelScope
似乎可以解决主要问题。您甚至不需要手动取消此范围,如 Android docs:
init {
viewModelScope.launch {
// Coroutine that will be canceled when the ViewModel is cleared.
}
}
请记住,默认的 viewModelScope
调度程序是 Main
,这并不明显,它可能不是您想要的!要更改调度程序,请使用 viewModelScope.launch(YourDispatcher)
.
此外,我的热 SharedFlow
是从另一个冷 Flow
转换而来的 callbackFlow
回调 API (基于 Channels
API - 这很复杂...)
将收集范围更改为 viewModelScope
后,我在从 API 发出新数据时遇到 ChildCancelledException: Child of the scoped flow was cancelled
异常。这个问题在 GitHub:
- kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
- SendChannel.offer should never throw
如前所述,使用 offer
和 send
的排放之间存在细微差别:
offer is for non-suspending context, while send is for suspending ones.
offer is, unfortunately, non-symmetric to send in terms of propagated exceptions (CancellationException from send is usually ignored, while CancellationException from offer in nom-suspending context is not).
We hope to fix it in #974 either with offerOrClosed or changing offer semantics
至于 Kotlin Coroutines of 1.4.2,#974 还没有修复 - 我希望它能在不久的将来避免意外 CancellationException
.
最后,我建议在 shareIn
运算符中使用 started
参数。在所有这些更改之后,我不得不在我的用例中从 WhileSubscribed()
更改为 Lazily
。
如果我找到任何新信息,我会更新此 post。希望我的研究能节省一些人的时间。
使用共享流。在下面的示例中,我从一个片段中发出值并将其收集到另一个片段中。
视图模型:
class MenuOptionsViewModel : ViewModel() {
private val _option = MutableSharedFlow<String>()
val option = _option.asSharedFlow()
suspend fun setOption(o : String){
_option.emit(o)
}
}
片段发射值:
class BottomSheetOptionsFragment : BottomSheetDialogFragment() , KodeinAware{
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
menuViewModel = activity?.run {
ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
} ?: throw Exception("Invalid Activity")
listViewOptions.adapter = ArrayAdapter<String>(
requireContext(),
R.layout.menu_text_item,
options
)
listViewOptions.setOnItemClickListener { adapterView, view, i, l ->
val entry: String = listViewOptions.getAdapter().getItem(i) as String
// here we are emitting values
GlobalScope.launch { menuViewModel.setOption(entry) }
Log.d(TAG, "emitting flow $entry")
dismiss()
}
}
}
片段收集值:
class DetailFragment : BaseFragment(), View.OnClickListener, KodeinAware,
OnItemClickListener {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
menuViewModel = activity?.run {
ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
} ?: throw Exception("Invalid Activity")
// collecting values
lifecycleScope.launchWhenStarted {
menuViewModel.option.collect {
Log.d(TAG, "collecting flow $it")
}
}
}