Kotlin:CoroutineScope 即使在流程取消后仍在收集数据
Kotlin: CoroutineScope is collecting data even after flow is cancelled
我正在尝试在我的 PagingSource
中启动一个 Coroutine
以查看我的分页源已经尝试获取我的数据多长时间了。我在这里遇到的唯一问题是,即使在我停止 shopPagingWatcher Flow
之后,我的 Coroutine
仍在以某种方式收集一些数据。因此,它 throws IOException("No Intenet Exception)
即使它不应该。
我正在启动一个 Coroutine
因为监视状态不应该阻止我的 paging source
的主要流程
PagingSource
class ShopRemoteMediator @Inject constructor(
private val db: FirebaseFirestore,
private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {
@InternalCoroutinesApi
override suspend fun load(params: LoadParams<QuerySnapshot>): LoadResult<QuerySnapshot, Product> {
return try {
// Launch Async Coroutine, Observe State, throw IO Exception when not loaded within 5 seconds
shopPagingWatcher.start()
CoroutineScope(Dispatchers.IO).launch {
shopPagingWatcher.observeMaxTimeReached().collect { maxTimeReached ->
if (maxTimeReached) {
Timber.d("Mediator failed")
throw IOException("No Internet Exception")
}
}
}
val currentPage = params.key ?: db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.get()
.await()
val lastDocumentSnapShot = currentPage.documents[currentPage.size() - 1]
val nextPage = db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.startAfter(lastDocumentSnapShot)
.get()
.await()
// When PagingSource is here, it successfully loaded currentPage and nextPage, therefore stop Watcher
Timber.d("Mediator Sucessfull")
shopPagingWatcher.stop()
LoadResult.Page(
data = currentPage.toObjects(),
prevKey = null,
nextKey = nextPage
)
} catch (e: Exception) {
// IOException should be caught here, but it is not! The app crashed instead!
Timber.d("Mediator Exception ist $e")
LoadResult.Error(e)
}
}
}
ShopPagingWatcher
@Singleton
class ShopPagingWatcher @Inject constructor() : Workwatcher()
抽象 WorkWatcher
abstract class Workwatcher {
private companion object {
private val dispatcher = Dispatchers.IO
private var timeStamp by Delegates.notNull<Long>()
private var running = false
private var manuallyStopped = false
private var finished = false
private const val maxTime: Long = 5000000000L
}
// Push the current timestamp, set running to true
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun start() = synchronized(dispatcher) {
timeStamp = System.nanoTime()
running = true
manuallyStopped = false
finished = false
}
// Manually stop the WorkerHelper
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun stop() = synchronized(dispatcher) {
running = false
manuallyStopped = true
finished = true
Timber.d("Mediator stopped")
}
// Function that observes the time
fun observeMaxTimeReached(): Flow<Boolean> = flow {
// Check if maxTime is not passed with → (System.nanoTime() - timeStamp) <= maxTime
while (running && !finished && !manuallyStopped && (System.nanoTime() - timeStamp) <= maxTime) {
emit(false)
Timber.d("Currenttime is smaller, everything fine")
}
// This will be executed only when the Worker is running longer than maxTime
if (!manuallyStopped && !finished) {
Timber.d("Currenttime bigger, yikes. Stop worker")
emit(true)
running = false
finished = true
return@flow
} else if (finished || manuallyStopped) {
return@flow
}
}.flowOn(dispatcher)
}
我应该如何改变 PagingSource
中的 Coroutine
以实现我的目标? Timber.d("Mediator stopped)
被调用。
非常感谢您的帮助,谢谢。
您需要测量持续时间吗?时间已经无处不在,您不需要另一个线程或协程来跟踪它。 measureNanoTime {}
衡量代码块执行的时间。
是否需要在挂起函数中应用超时? withTimeout
正是为此。示例:
class ShopRemoteMediator @Inject constructor(
private val db: FirebaseFirestore,
private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {
@InternalCoroutinesApi
override suspend fun load(
params: LoadParams<QuerySnapshot>
): LoadResult<QuerySnapshot, Product> {
return try {
withTimeout(5, TimeUnit.SECONDS) { // <<<<<<<<<<
val currentPage = ...
val nextPage = ...
LoadResult.Page(
data = currentPage.toObjects(),
prevKey = null,
nextKey = nextPage
)
}
} catch (e: IOException) {
LoadResult.Error(e)
} catch (e: TimeoutCancellationException) { // <<<<<<<<<<
LoadResult.Error(e)
}
}
}
我正在尝试在我的 PagingSource
中启动一个 Coroutine
以查看我的分页源已经尝试获取我的数据多长时间了。我在这里遇到的唯一问题是,即使在我停止 shopPagingWatcher Flow
之后,我的 Coroutine
仍在以某种方式收集一些数据。因此,它 throws IOException("No Intenet Exception)
即使它不应该。
我正在启动一个 Coroutine
因为监视状态不应该阻止我的 paging source
PagingSource
class ShopRemoteMediator @Inject constructor(
private val db: FirebaseFirestore,
private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {
@InternalCoroutinesApi
override suspend fun load(params: LoadParams<QuerySnapshot>): LoadResult<QuerySnapshot, Product> {
return try {
// Launch Async Coroutine, Observe State, throw IO Exception when not loaded within 5 seconds
shopPagingWatcher.start()
CoroutineScope(Dispatchers.IO).launch {
shopPagingWatcher.observeMaxTimeReached().collect { maxTimeReached ->
if (maxTimeReached) {
Timber.d("Mediator failed")
throw IOException("No Internet Exception")
}
}
}
val currentPage = params.key ?: db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.get()
.await()
val lastDocumentSnapShot = currentPage.documents[currentPage.size() - 1]
val nextPage = db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.startAfter(lastDocumentSnapShot)
.get()
.await()
// When PagingSource is here, it successfully loaded currentPage and nextPage, therefore stop Watcher
Timber.d("Mediator Sucessfull")
shopPagingWatcher.stop()
LoadResult.Page(
data = currentPage.toObjects(),
prevKey = null,
nextKey = nextPage
)
} catch (e: Exception) {
// IOException should be caught here, but it is not! The app crashed instead!
Timber.d("Mediator Exception ist $e")
LoadResult.Error(e)
}
}
}
ShopPagingWatcher
@Singleton
class ShopPagingWatcher @Inject constructor() : Workwatcher()
抽象 WorkWatcher
abstract class Workwatcher {
private companion object {
private val dispatcher = Dispatchers.IO
private var timeStamp by Delegates.notNull<Long>()
private var running = false
private var manuallyStopped = false
private var finished = false
private const val maxTime: Long = 5000000000L
}
// Push the current timestamp, set running to true
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun start() = synchronized(dispatcher) {
timeStamp = System.nanoTime()
running = true
manuallyStopped = false
finished = false
}
// Manually stop the WorkerHelper
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun stop() = synchronized(dispatcher) {
running = false
manuallyStopped = true
finished = true
Timber.d("Mediator stopped")
}
// Function that observes the time
fun observeMaxTimeReached(): Flow<Boolean> = flow {
// Check if maxTime is not passed with → (System.nanoTime() - timeStamp) <= maxTime
while (running && !finished && !manuallyStopped && (System.nanoTime() - timeStamp) <= maxTime) {
emit(false)
Timber.d("Currenttime is smaller, everything fine")
}
// This will be executed only when the Worker is running longer than maxTime
if (!manuallyStopped && !finished) {
Timber.d("Currenttime bigger, yikes. Stop worker")
emit(true)
running = false
finished = true
return@flow
} else if (finished || manuallyStopped) {
return@flow
}
}.flowOn(dispatcher)
}
我应该如何改变 PagingSource
中的 Coroutine
以实现我的目标? Timber.d("Mediator stopped)
被调用。
非常感谢您的帮助,谢谢。
您需要测量持续时间吗?时间已经无处不在,您不需要另一个线程或协程来跟踪它。 measureNanoTime {}
衡量代码块执行的时间。
是否需要在挂起函数中应用超时? withTimeout
正是为此。示例:
class ShopRemoteMediator @Inject constructor(
private val db: FirebaseFirestore,
private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {
@InternalCoroutinesApi
override suspend fun load(
params: LoadParams<QuerySnapshot>
): LoadResult<QuerySnapshot, Product> {
return try {
withTimeout(5, TimeUnit.SECONDS) { // <<<<<<<<<<
val currentPage = ...
val nextPage = ...
LoadResult.Page(
data = currentPage.toObjects(),
prevKey = null,
nextKey = nextPage
)
}
} catch (e: IOException) {
LoadResult.Error(e)
} catch (e: TimeoutCancellationException) { // <<<<<<<<<<
LoadResult.Error(e)
}
}
}