Kotlin - 如何 运行 n 个协程并等待前 m 个结果或超时?

Kotlin - How to run n coroutines and wait for first m results or timeout?

我正在尝试编写一个函数来启动 n 个协程并等待第一个 m 协程完成。如果 m 个协程未能在某个超时时间内完成,则所有 coroutines/jobs 都将被取消。我对此的初步实现如下所示,但我觉得它可以改进。我最初的想法是使用父作业来 运行 所有其他作业,这样父作业就可以取消并级联到其余的子作业。但是,这会导致必须捕获的 TimeoutCancellationException。

如何编写一个函数来启动 n 个协程并等待第一个 m 个协程完成,或者在 m 个协程完成之前发生超时?

private suspend fun queryAllHosts(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , maxSuccessfulHosts: Int
        , queryTimeout: Long
        , requestTimeout: Long
): ArrayList<QueryResult<ResultModel>> {
    val results = ArrayList<QueryResult<ResultModel>>()
    val rootJob = Job()

    try {
        withTimeout(queryTimeout, TimeUnit.MILLISECONDS) {
            queryFactories.map {
                async(parent = rootJob) {
                    val pagedResult = queryHost(
                            it
                            , query
                            , pageIndex
                            , requestTimeout
                    )

                    if (pagedResult.isSuccessful()) {
                        results.add(pagedResult)
                    }

                    if (results.size == maxSuccessfulHosts) {
                        rootJob.cancelAndJoin()
                        return@async
                    }
                }
            }.awaitAll()
        }
    } catch (ex: TimeoutCancellationException) {
        Log.w(Tag, "Query timed out, successful queries: ${results.size}")
    } catch (ignored: JobCancellationException) {
        // Ignored
    } catch (ex: Exception) {
        Log.w(Tag, "Unexpected exception", ex)
    }

    return results
}

更新

由于并发修改异常,我对之前接受的答案没有任何运气。下面是对原始答案的轻微调整,以避免并发修改异常,但是它没有遵守代码超时。

如何避免并发修改异常并仍然遵守代码超时?

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = HashSet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticker = ticker(timeoutMs)

    forEach { deferred ->
        deferred.invokeOnCompletion {
            if (!deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed) Value: ${deferred.getCompleted()}")
            } else {
                Log.d(Tag, "(Completed) Exception: $it")
            }
        }
    }

    val processed = HashSet<Deferred<T>>()

    val elapsedTime = measureTimeMillis {
        whileSelect {
            toAwait.minus(processed).forEach { deferred ->
                processed.add(deferred)
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }

            ticker.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

现在使用以下代码创建延迟实例:

private fun makeRequest(
        url: String
        , timeoutMs: Int
): Document? = try {
    Jsoup.connect(url).timeout(timeoutMs).get()
} catch (ex: Exception) {
    null
}

private fun createAsyncRequests(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , timeoutMs: Int
): List<Deferred<QueryResult<TorrentResult>>> = queryFactories.map { queryFactory ->
    async(start = CoroutineStart.LAZY) {
        try {
            val url = queryFactory(query, pageIndex)
            makeRequest(
                    url
                    , timeoutMs
            ).getQueryResult(pageIndex, url)
        } catch (ex: Exception) {
            QueryResult<TorrentResult>(state = QueryResult.State.ERROR)
        }
    }
}

更新

下面的日志显示提供的 2,000 毫秒超时未被遵守,因为超时发生在 11,861 毫秒:

2018-09-13 22:39:00.307 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.315 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.454 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|1/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://indiapirate.com/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.455 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.456 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.470 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|2/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.471 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.472 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.479 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|3/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|4/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.482 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.500 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|5/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.be/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|6/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay.nz/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.578 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|7/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay6.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.603 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|8/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.605 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|9/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://uktpb.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.622 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.694 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|10/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxyproxy.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.712 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|11/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://fastpirate.link/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|12/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freepirate.eu/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.480 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|13/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://pirate.tel/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.481 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.482 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|14/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratesbay.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.650 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.780 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|15/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.781 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.782 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.131 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|16/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.250 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|17/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.251 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.253 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|18/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freeproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.297 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.441 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|19/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.442 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.443 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:04.826 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|20/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:04.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|21/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:05.325 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|22/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:05.926 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|23/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.002 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|24/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.117 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|25/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:06.338 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|26/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.923 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|27/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebay.red/search/hobbit+1977/0/7
2018-09-13 22:39:07.214 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|28/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:07.625 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|29/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateway.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.398 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|30/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:08.431 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|31/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.blue/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.684 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|32/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepiratepirate.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.033 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|33/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://unblocktpb.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.759 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|34/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:09.879 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|35/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbunblock.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.961 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|36/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.554 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|37/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:10.715 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|38/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://Piratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.855 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|39/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:11.014 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|40/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.fun/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.298 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|41/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.313 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|42/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.review/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.932 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|43/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.life/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.166 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|44/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.one/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.168 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Elapsed time: 11861
2018-09-13 22:39:12.885 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|45/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@b129740
2018-09-13 22:39:13.437 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|46/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c801679
2018-09-13 22:39:14.051 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|47/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@3f89fbe
2018-09-13 22:39:14.154 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|48/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c7c181f
2018-09-13 22:39:14.658 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|49/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@729e76c
2018-09-13 22:39:17.334 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|50/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@ae2a135

更新了带有日志记录的代码(我删除了 TimeoutException 并返回 false):

suspend fun <T> List<Deferred<T>>.awaitCount(
        count: Int
        , timeoutMs: Long
): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)
    var completedCount = 0

    forEach { deferred ->
        deferred.invokeOnCompletion {
            completedCount++

            if (deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed|$completedCount/$size) Exception: $it")
            } else {
                Log.d(Tag, "(Completed|$completedCount/$size) Value: ${deferred.getCompleted()}")
            }
        }
    }

    val elapsedTime = measureTimeMillis {
        whileSelect {
            ticket.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }

            toAwait.forEach { deferred ->
                Log.d(Tag, "Starting deferred..")
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

可以通过完全避免使用额外启动的任务和根作业来改进。

kotlinx.coroutines 具有针对此类复杂运算符的 select 子句,非常适合您的用例。 而且,很容易概括:

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)

    whileSelect {
        toAwait.forEach { deferred ->
            deferred.onAwait {
                toAwait.remove(deferred)
                result.add(it)
                result.size != count
            }
        }

        ticket.onReceive {
            val e = TimeoutException()
            toAwait.forEach { it.cancel(e) }
            throw e
        }
    }

    return result
}

然后就可以在queryAllHosts中使用了:

val queries = queryFactories.map { queryHost(...) }
return queries.awaitCount(maxSuccessfulHosts, queryTimeout)

您可以根据需要调整 awaitCount,例如专门用于 Retrofit 并添加 isSuccessful 检查