Kotlin Coroutine Flow:限制收集器的数量
Kotlin Coroutine Flow: Limit the number of collector
有没有办法限制 returns 使用流生成器的流的函数中的收集器数量?
我在 ViewModel
中有这个 public 方法
fun fetchAssets(limit: String) {
viewModelScope.launch {
withContext(Dispatchers.IO){
getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {
when (it) {
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
此方法在 ViewModel 的 init
块上调用,但也可以在 UI.
上手动调用
此流每 10 秒发出一次值。
存储库
override fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
emit(RequestStatus.Loading())
val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
emit(RequestStatus.Success(domainModel))
} catch (e: HttpException) {
emit(RequestStatus.Failed(e))
} catch (e: IOException) {
emit(RequestStatus.Failed(e))
}
delay(10_000)
}
}
不幸的是,每次从 UI 调用 fetch()
时,我注意到它会创建另一个收集器,因此最终会产生大量收集器,这非常糟糕且不正确。
这个想法是让一个流每 10 秒发出一次值,但也可以通过 UI 手动调用以立即更新数据,而无需多个收集器。
您似乎误解了收集流量的含义或者您误用了收集操作。通过收集流量,我们的意思是我们观察它的变化。但是你尝试用collect()
来引入流程的变化,这确实行不通。它只是在后台启动另一个流程。
您应该只收集一次流量,因此请将其保存在 init
内或适合您的情况的任何地方。然后你需要更新流程的逻辑,以使其能够触发按需重新加载。有很多方法可以做到这一点,解决方案会有所不同,具体取决于您是否需要在手动更新时重置计时器。例如,我们可以使用通道通知流需要重新加载:
val reloadChannel = Channel<Unit>(Channel.CONFLATED)
fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
...
}
withTimeoutOrNull(10.seconds) { reloadChannel.receive() } // replace `delay()` with this
}
}
fun reload() {
reloadChannel.trySend(Unit)
}
每当您需要触发手动重新加载时,不要启动另一个流程或调用另一个 collect()
操作,而只需调用 reload()
。然后已经被收集的流将开始重新加载并发出状态变化。
此解决方案会在手动重新加载时重置计时器,我认为这对用户体验更好。
我最终在 ViewModel 上移动了计时器,因为我可以请求按需获取,同时也没有同时运行的多个收集器。
private var job: Job? = null
private val _assetState = defaultMutableSharedFlow<AssetState>()
fun getAssetState() = _assetState.asSharedFlow()
init {
job = viewModelScope.launch {
while(true) {
if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
fetchAssets()
delay(10_000)
}
}
}
fun fetchAssets() {
viewModelScope.launch {
withContext(Dispatchers.IO) {
getAssetsUseCase(
AppConfigs.ASSET_BASE_URL,
AppConfigs.ASSET_PARAMS,
AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
).onEach {
when(it){
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
override fun onCleared() {
job?.cancel()
super.onCleared()
}
如果这是代码味道,请指正。
有没有办法限制 returns 使用流生成器的流的函数中的收集器数量?
我在 ViewModel
中有这个 public 方法fun fetchAssets(limit: String) {
viewModelScope.launch {
withContext(Dispatchers.IO){
getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {
when (it) {
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
此方法在 ViewModel 的 init
块上调用,但也可以在 UI.
此流每 10 秒发出一次值。
存储库
override fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
emit(RequestStatus.Loading())
val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
emit(RequestStatus.Success(domainModel))
} catch (e: HttpException) {
emit(RequestStatus.Failed(e))
} catch (e: IOException) {
emit(RequestStatus.Failed(e))
}
delay(10_000)
}
}
不幸的是,每次从 UI 调用 fetch()
时,我注意到它会创建另一个收集器,因此最终会产生大量收集器,这非常糟糕且不正确。
这个想法是让一个流每 10 秒发出一次值,但也可以通过 UI 手动调用以立即更新数据,而无需多个收集器。
您似乎误解了收集流量的含义或者您误用了收集操作。通过收集流量,我们的意思是我们观察它的变化。但是你尝试用collect()
来引入流程的变化,这确实行不通。它只是在后台启动另一个流程。
您应该只收集一次流量,因此请将其保存在 init
内或适合您的情况的任何地方。然后你需要更新流程的逻辑,以使其能够触发按需重新加载。有很多方法可以做到这一点,解决方案会有所不同,具体取决于您是否需要在手动更新时重置计时器。例如,我们可以使用通道通知流需要重新加载:
val reloadChannel = Channel<Unit>(Channel.CONFLATED)
fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
...
}
withTimeoutOrNull(10.seconds) { reloadChannel.receive() } // replace `delay()` with this
}
}
fun reload() {
reloadChannel.trySend(Unit)
}
每当您需要触发手动重新加载时,不要启动另一个流程或调用另一个 collect()
操作,而只需调用 reload()
。然后已经被收集的流将开始重新加载并发出状态变化。
此解决方案会在手动重新加载时重置计时器,我认为这对用户体验更好。
我最终在 ViewModel 上移动了计时器,因为我可以请求按需获取,同时也没有同时运行的多个收集器。
private var job: Job? = null
private val _assetState = defaultMutableSharedFlow<AssetState>()
fun getAssetState() = _assetState.asSharedFlow()
init {
job = viewModelScope.launch {
while(true) {
if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
fetchAssets()
delay(10_000)
}
}
}
fun fetchAssets() {
viewModelScope.launch {
withContext(Dispatchers.IO) {
getAssetsUseCase(
AppConfigs.ASSET_BASE_URL,
AppConfigs.ASSET_PARAMS,
AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
).onEach {
when(it){
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
override fun onCleared() {
job?.cancel()
super.onCleared()
}
如果这是代码味道,请指正。