flatMap 和具有可变处理时间的并行之间的并发行为差异

Concurrency behavior difference between flatMap and parallel with variable processing times

我在以下两个管道中看到了我期望的不同行为 similar/same。目的是处理并发级别为 x(示例中为 4)的项目,并且不阻止任何项目在 'buffer'.

中进行处理

我在这里重新创建了场景,使用 Flowable.range(1, 1280) 作为源,并在项目 2 上模拟了一些 'slow processing',它只会阻塞 5 秒。

Flowable.range(1, 1280) // cold flowable, items are produced 'on-demand'
    .doOnNext { logEvent("produced $it") }
    .parallel(4, 1) //parallism is 4, prefetch is 1
    .runOn(Schedulers.computation(), 1) //again use prefetch of 1
    .doOnNext(::process)
    .sequential()
    .doOnNext { logEvent("done with $it") }
    .ignoreElements()
    .blockingAwait()

为此我得到如下输出:

...
2021-04-12T12:15:49.147 - [main] produced 4
2021-04-12T12:15:49.147 - [main] produced 5
2021-04-12T12:15:49.147 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:15:49.147 - [RxComputationThreadPool-4] fast processing 4
...
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] fast processing 1278
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] done with 2

本例中每个线程处理的项目数明细是这样的:

RxComputationThreadPool-2: 1
RxComputationThreadPool-4: 429
RxComputationThreadPool-3: 416
RxComputationThreadPool-1: 434

还要注意日志中的时间,除 2 之外的所有项目都在 1 秒内处理,项目 2 按预期在 5 秒后完成。

现在我希望通过这种 flatMap 方法实现类似的行为:

Flowable.range(1, 1280)
    .doOnNext { logEvent("produced $it") }
    .flatMapSingle({ Single.fromCallable { process(it); it }.subscribeOn(Schedulers.computation()) },
                   true, 4) // delayErrors (true or false doesn't matter), and maxConcurrency
    .doOnNext { logEvent("done with $it") }
    .ignoreElements()
    .blockingAwait()

但是我得到这样的输出:

...
2021-04-12T12:29:24.452 - [main] produced 4
2021-04-12T12:29:24.454 - [RxComputationThreadPool-1] fast processing 1
2021-04-12T12:29:24.454 - [RxComputationThreadPool-3] fast processing 3
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] done with 1
2021-04-12T12:29:24.455 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] produced 5
...
2021-04-12T12:29:24.458 - [RxComputationThreadPool-8] produced 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] fast processing 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] done with 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] produced 26
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] done with 2
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] produced 27
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] fast processing 10
...
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] produced 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-7] fast processing 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-8] fast processing 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1280

随着每个线程处理的项目数量的细分总是像这样均匀分布:

RxComputationThreadPool-3: 160
RxComputationThreadPool-4: 160
RxComputationThreadPool-5: 160
RxComputationThreadPool-7: 160
RxComputationThreadPool-2: 160
RxComputationThreadPool-6: 160
RxComputationThreadPool-1: 160
RxComputationThreadPool-8: 160

这里,注意 produced 26slow processing 2 done 之间的时间间隔,事实上,处理 26 的 'single' 直到 2 完成处理才创建,即使它是唯一的项目正在处理中。

即使maxConcurrency设置为4,为什么在使用flatMap时处理1项会阻塞其他项的处理?

上面使用的实用程序供参考:

        data class Event(val thread: Thread, val timeMillis: Long, val msg: String) {
            val localTime: LocalDateTime by lazy (LazyThreadSafetyMode.NONE) {
                Instant.ofEpochMilli(timeMillis).atZone(ZoneId.systemDefault()).toLocalDateTime()
            }
            override fun toString(): String = "${localTime} - [${thread.name}] $msg"
        }
        val events: ConcurrentLinkedDeque<Event> = ConcurrentLinkedDeque()
        fun logEvent(msg: String) {
            events.add(Event(Thread.currentThread(), System.currentTimeMillis(), msg))
        }
        val countsByThread: ConcurrentMap<Thread, Int> = ConcurrentHashMap()
        fun process(it: Int) {
            val thread = Thread.currentThread()
            countsByThread.compute(thread) { _, prev -> if (prev != null) prev + 1 else 1 }
            if (it == 2) {
                logEvent("slow processing $it")
                Thread.sleep(5_000)
                logEvent("slow processing $it done")
            } else {
                logEvent("fast processing $it")
            }
        }

Rxjava2 版本 2.2.21

通过 parallel 设置,您将获得固定数量的 rails,随着它们的进展需要更多的项目。由于只有一根轨道停滞时间较长,其他 3 根轨道可以请求并得到服务。

使用 flatMap 设置,每个项目都以循环方式分配给 Scheduleritem-1-scheduler-1item-2-scheduler-2, ..., item-5-scheduler-1, item-6-scheduler -2。如果 item-N 使调度程序陷入困境,则 item-(N+4)item-(N+8 ),等会等待item-N完成。因此,例如,当 item-5-scheduler-1 完成时,下一个项目将被分配给阻塞的 scheduler-2。在几个项目之后,所有 4 个活动项目都将等待 scheduler-2