Kotlin Coroutine Flow:限制收集器的数量

Kotlin Coroutine Flow: Limit the number of collector

有没有办法限制 returns 使用流生成器的流的函数中的收集器数量?

我在 ViewModel

中有这个 public 方法
fun fetchAssets(limit: String) {

        viewModelScope.launch {

            withContext(Dispatchers.IO){
                getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {

                    when (it) {

                        is RequestStatus.Loading -> {
                            _assetState.tryEmit(AssetState.FetchLoading)
                        }

                        is RequestStatus.Success -> {
                            _assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
                        }

                        is RequestStatus.Failed -> {
                            _assetState.tryEmit(AssetState.FetchFailed(it.message))
                        }

                    }

                }.collect()
            }

        }

    }

此方法在 ViewModel 的 init 块上调用,但也可以在 UI.

上手动调用

此流每 10 秒发出一次值。

存储库

override fun fetchAssets(
        query: String,
        limit: String
    ) = flow {
        while (true) {
            try {
                interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
                emit(RequestStatus.Loading())
                val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
                emit(RequestStatus.Success(domainModel))
            } catch (e: HttpException) {
                emit(RequestStatus.Failed(e))
            } catch (e: IOException) {
                emit(RequestStatus.Failed(e))
            }
            delay(10_000)
        }
    }

不幸的是,每次从 UI 调用 fetch() 时,我注意到它会创建另一个收集器,因此最终会产生大量收集器,这非常糟糕且不正确。

这个想法是让一个流每 10 秒发出一次值,但也可以通过 UI 手动调用以立即更新数据,而无需多个收集器。

您似乎误解了收集流量的含义或者您误用了收集操作。通过收集流量,我们的意思是我们观察它的变化。但是你尝试用collect()来引入流程的变化,这确实行不通。它只是在后台启动另一个流程。

您应该只收集一次流量,因此请将其保存在 init 内或适合您的情况的任何地方。然后你需要更新流程的逻辑,以使其能够触发按需重新加载。有很多方法可以做到这一点,解决方案会有所不同,具体取决于您是否需要在手动更新时重置计时器。例如,我们可以使用通道通知流需要重新加载:

val reloadChannel = Channel<Unit>(Channel.CONFLATED)

fun fetchAssets(
    query: String,
    limit: String
) = flow {
    while (true) {
        try {
            ...
        }
        
        withTimeoutOrNull(10.seconds) { reloadChannel.receive() } // replace `delay()` with this
    }
}

fun reload() {
    reloadChannel.trySend(Unit)
}

每当您需要触发手动重新加载时,不要启动另一个流程或调用另一个 collect() 操作,而只需调用 reload()。然后已经被收集的流将开始重新加载并发出状态变化。

此解决方案会在手动重新加载时重置计时器,我认为这对用户体验更好。

我最终在 ViewModel 上移动了计时器,因为我可以请求按需获取,同时也没有同时运行的多个收集器。

private var job: Job? = null

    private val _assetState = defaultMutableSharedFlow<AssetState>()

    fun getAssetState() = _assetState.asSharedFlow()

    init {
        job = viewModelScope.launch {
            while(true) {
                if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
                    fetchAssets()
                delay(10_000)
            }
        }
    }

    fun fetchAssets() {

        viewModelScope.launch {

            withContext(Dispatchers.IO) {
                getAssetsUseCase(
                    AppConfigs.ASSET_BASE_URL,
                    AppConfigs.ASSET_PARAMS,
                    AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
                ).onEach {

                    when(it){

                        is RequestStatus.Loading -> {
                            _assetState.tryEmit(AssetState.FetchLoading)
                        }

                        is RequestStatus.Success -> {
                            _assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
                        }

                        is RequestStatus.Failed -> {
                            _assetState.tryEmit(AssetState.FetchFailed(it.message))
                        }

                    }

                }.collect()
            }

        }

    }

    override fun onCleared() {
        job?.cancel()
        super.onCleared()
    }

如果这是代码味道,请指正。