Kotlin:CoroutineScope 即使在流程取消后仍在收集数据

Kotlin: CoroutineScope is collecting data even after flow is cancelled

我正在尝试在我的 PagingSource 中启动一个 Coroutine 以查看我的分页源已经尝试获取我的数据多长时间了。我在这里遇到的唯一问题是,即使在我停止 shopPagingWatcher Flow 之后,我的 Coroutine 仍在以某种方式收集一些数据。因此,它 throws IOException("No Intenet Exception) 即使它不应该。

我正在启动一个 Coroutine 因为监视状态不应该阻止我的 paging source

的主要流程

PagingSource

class ShopRemoteMediator @Inject constructor(
    private val db: FirebaseFirestore,
    private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {

    @InternalCoroutinesApi
    override suspend fun load(params: LoadParams<QuerySnapshot>): LoadResult<QuerySnapshot, Product> {
        return try {
            // Launch Async Coroutine, Observe State, throw IO Exception when not loaded within 5 seconds
            shopPagingWatcher.start()
            CoroutineScope(Dispatchers.IO).launch {
                shopPagingWatcher.observeMaxTimeReached().collect { maxTimeReached ->
                    if (maxTimeReached) {
                        Timber.d("Mediator failed")
                        throw IOException("No Internet Exception")
                    }
                }
            }

            val currentPage = params.key ?: db.collection(FIREBASE_PRODUCTS)
                .limit(SHOP_LIST_LIMIT)
                .get()
                .await()

            val lastDocumentSnapShot = currentPage.documents[currentPage.size() - 1]

            val nextPage = db.collection(FIREBASE_PRODUCTS)
                .limit(SHOP_LIST_LIMIT)
                .startAfter(lastDocumentSnapShot)
                .get()
                .await()

            // When PagingSource is here, it successfully loaded currentPage and nextPage, therefore stop Watcher
            Timber.d("Mediator Sucessfull")
            shopPagingWatcher.stop()

            LoadResult.Page(
                data = currentPage.toObjects(),
                prevKey = null,
                nextKey = nextPage
            )

        } catch (e: Exception) {
            // IOException should be caught here, but it is not! The app crashed instead!
            Timber.d("Mediator Exception ist $e")
            LoadResult.Error(e)
        }
    }
}

ShopPagingWatcher

@Singleton
class ShopPagingWatcher @Inject constructor() : Workwatcher()

抽象 WorkWatcher

abstract class Workwatcher {
    private companion object {
        private val dispatcher = Dispatchers.IO
        private var timeStamp by Delegates.notNull<Long>()

        private var running = false
        private var manuallyStopped = false
        private var finished = false

        private const val maxTime: Long = 5000000000L
    }

    // Push the current timestamp, set running to true
    // I don't know if it is necessary to use "synchronized"
    @InternalCoroutinesApi
    fun start() = synchronized(dispatcher) {
        timeStamp = System.nanoTime()
        running = true
        manuallyStopped = false
        finished = false
    }


    // Manually stop the WorkerHelper 
    // I don't know if it is necessary to use "synchronized"
    @InternalCoroutinesApi
    fun stop()  = synchronized(dispatcher) {
        running = false
        manuallyStopped = true
        finished = true
        Timber.d("Mediator stopped")
    }

    // Function that observes the time
    fun observeMaxTimeReached(): Flow<Boolean> = flow {
        // Check if maxTime is not passed with → (System.nanoTime() - timeStamp) <= maxTime
        while (running && !finished && !manuallyStopped && (System.nanoTime() - timeStamp) <= maxTime) {
            emit(false)
            Timber.d("Currenttime is smaller, everything fine")
        }
        // This will be executed only when the Worker is running longer than maxTime
        if (!manuallyStopped && !finished) {
            Timber.d("Currenttime bigger, yikes. Stop worker")
            emit(true)
            running = false
            finished = true
            return@flow
        } else if (finished || manuallyStopped) {
            return@flow
        }
    }.flowOn(dispatcher)
}

我应该如何改变 PagingSource 中的 Coroutine 以实现我的目标? Timber.d("Mediator stopped) 被调用。

非常感谢您的帮助,谢谢。

您需要测量持续时间吗?时间已经无处不在,您不需要另一个线程或协程来跟踪它。 measureNanoTime {} 衡量代码块执行的时间。

是否需要在挂起函数中应用超时? withTimeout 正是为此。示例:

class ShopRemoteMediator @Inject constructor(
    private val db: FirebaseFirestore,
    private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {

    @InternalCoroutinesApi
    override suspend fun load(
        params: LoadParams<QuerySnapshot>
    ): LoadResult<QuerySnapshot, Product> {
        return try {
            withTimeout(5, TimeUnit.SECONDS) {         // <<<<<<<<<<
                val currentPage = ...
                val nextPage = ...
                LoadResult.Page(
                    data = currentPage.toObjects(),
                    prevKey = null,
                    nextKey = nextPage
                )
            }
        } catch (e: IOException) {
            LoadResult.Error(e)
        } catch (e: TimeoutCancellationException) {    // <<<<<<<<<<
            LoadResult.Error(e)
        }
    }
}