Kotlin Coroutine Flow:使用 Flow 什么时候会出现资源浪费

Kotlin Coroutine Flow: When does wasting resource happen when using Flow

我正在阅读此 article 以在将其与我的实现进行比较时充分了解使用 Flow 的注意事项,但我无法清楚地掌握如何判断您在使用 Flow 或流量生成器。什么时候流 release/freed 在内存中,什么时候浪费资源,比如不小心创建多个流实例而不释放它们?

我有一个用例 class,它调用 returns Flow 的存储库函数。在我的 ViewModel 中,它是这样的。

class AssetViewModel constructor(private val getAssetsUseCase: GetAssetsUseCase) : BaseViewModel() {

    private var job: Job? = null

    private val _assetState = defaultMutableSharedFlow<AssetState>()

    fun getAssetState() = _assetState.asSharedFlow()

    init {
        job = viewModelScope.launch {
            while(true) {
                if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
                    fetchAssets()
                delay(10_000)
            }
        }
    }

    fun fetchAssets() {

        viewModelScope.launch {

            withContext(Dispatchers.IO) {
                getAssetsUseCase(
                    AppConfigs.ASSET_BASE_URL,
                    AppConfigs.ASSET_PARAMS,
                    AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
                ).onEach {

                    when(it){

                        is RequestStatus.Loading -> {
                            _assetState.tryEmit(AssetState.FetchLoading)
                        }

                        is RequestStatus.Success -> {
                            _assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
                        }

                        is RequestStatus.Failed -> {
                            _assetState.tryEmit(AssetState.FetchFailed(it.message))
                        }

                    }

                }.collect()
            }

        }

    }

    override fun onCleared() {
        job?.cancel()
        super.onCleared()
    }

}

这里的想法是我们每 10 秒从远程获取一次数据,同时还允许通过 UI.

按需获取数据

只是一个典型的无用用例class

class GetAssetsUseCase @Inject constructor(
    private val repository: AssetsRepository // Passing interface not implementation for fake test
) {

    operator fun invoke(baseUrl: String, query: String, limit: String): Flow<RequestStatus<AssetDomain>> {
        return repository.fetchAssets(baseUrl, query, limit)
    }

}

repository的具体实现

class AssetsRepositoryImpl constructor(
    private val service: CryptoService,
    private val mapper: AssetDtoMapper
) : AssetsRepository {

    override fun fetchAssets(
        baseUrl: String,
        query: String,
        limit: String
    ) = flow {

        try {
            emit(RequestStatus.Loading())
            val domainModel = mapper.mapToDomainModel(
                service.getAssetItems(
                    baseUrl,
                    query,
                    limit
                )
            )
            emit(RequestStatus.Success(domainModel))
        } catch (e: HttpException) {
            emit(RequestStatus.Failed(e))
        } catch (e: IOException) {
            emit(RequestStatus.Failed(e))
        }

    }

}

阅读此 article 后,它说使用 stateInsharedIn 会提高使用流程时的性能,看来我正在创建相同流程的新实例-要求。但是有一个限制,因为所述方法仅适用于变量而不适用于 returns Flow.

如果有多个观察者,

stateInshareIn 可以通过避免冗余获取来节省资源。在您的情况下,您可以将其设置为在没有观察者时自动暂停自动 re-fetching 。如果在 UI 端你使用 repeatOnLifecycle,那么当视图离开屏幕时它会自动丢弃你的观察者,然后你将避免用户永远不会看到的浪费的提取。

我认为这种描述并不常见,但通常多个观察者只是来自同一个 Activity 或片段 class 屏幕旋转或在片段之间快速切换后的观察者。如果您使用带有超时的 WhileSubscribed 来解决这个问题,您可以避免在需要时再次快速启动流程。

目前您从外部协同程序发送到而不是使用 shareIn,因此没有暂停执行的机会。

我还没有尝试创建支持自动和手动重新获取的东西。这是一个可能的策略,但我还没有测试过。

private val refreshRequest = Channel<Unit>(Channel.CONFLATED)

fun fetchAssets() {
    refreshRequest.trySend(Unit)
}

val assetState = flow {
    while(true) {
        getAssetsUseCase(
            AppConfigs.ASSET_BASE_URL,
            AppConfigs.ASSET_PARAMS,
            AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
        ).map {
            when(it){
                is RequestStatus.Loading -> AssetState.FetchLoading
                is RequestStatus.Success -> AssetState.FetchSuccess(it.data.assetDataDomain)
                is RequestStatus.Failed -> AssetState.FetchFailed(it.message)
            }
        }.emitAll()
        withTimeoutOrNull(100L) {
             // drop any immediate or pending manual request
             refreshRequest.receive()
        }
        // Wait until a fetch is manually requested or ten seconds pass:
        withTimeoutOrNull(10000L - 100L) {
             refreshRequest.receive()
        }
    }
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(4000L), replay = 1)

为此,我建议不要使用流作为用例函数的 return 类型,并且 api 调用不得包含在流生成器中。

为什么:

api 调用实际上发生一次,然后在由视图模型本身触发的时间间隔后再次发生,return 来自 api 调用函数的流将是一个功能强大的工具的错误使用实际上意味着被调用一次然后它必须是 self-reliant,它应该发出或泵入数据直到它有一个 subscriber/collector.

在房间数据库查询调用中使用流作为 return 类型时,您可以考虑一个用例,它只被调用一次,然后房间将数据发送到它,直到它有订阅者。

.....

fun fetchAssets() {
        viewModelScope.launch {
//          loading true
            val result=getusecase(.....)
            when(result){..process result and emit on state..}
//            loading false
        }
}

.....

suspend operator fun invoke(....):RequestStatus<AssetDomain>{
    repository.fetchAssets(baseUrl, query, limit)
}

.....

覆盖有趣的 fetchAssets( baseUrl:字符串, 请求参数, 限制:字符串 ):请求状态{

    try {
        //RequestStatus.Loading()//this can be managed in viewmodel itself
        val domainModel = mapper.mapToDomainModel(
            service.getAssetItems(
                baseUrl,
                query,
                limit
            )
        )
        RequestStatus.Success(domainModel)
    } catch (e: HttpException) {
        RequestStatus.Failed(e)
    } catch (e: IOException) {
        RequestStatus.Failed(e)
    }

}