在 flatMap 上使用 reduce 时 Reactor Flux 订阅者流停止

Reactor Flux subscriber stream stopped when using reduce on flatMap

我想更改我的单用户密码。现在我有

auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
        s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));

此代码工作正常 reduce 方法非常简单。我尝试为单个订阅者更改我的代码

    auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
        .flatMap(window -> window.groupBy(Auction::getItem))
        .flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
        .subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));

这是我的代码,但这段代码不起作用。没有错误也没有结果。调试后我发现当我减少流时代码卡在第二个 flatMap 上。我认为问题在于 flatMap 合并,坚持 Mono 解决。有人现在如何解决这个问题并只使用单个订阅者?

如何复制,你可以使用另一个class或创建一个。小号有效,大号快死

List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
    Auction a = new Auction((long) i, "test");
    a.setItem((long) (i%50));
    auctionList.add(a);
}

Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
        longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));

这种方法立竿见影,但我使用了 3 个订阅者

Flux.fromIterable(auctionList)
        .groupBy(Auction::getId)
        .subscribe(
                auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
                )
        ));

添加 parallel 解决了问题,但我正在寻找答案,为什么会显着降低 flatMap 的速度。

我认为您描述的行为与 groupByflatMap 链接之间的交互有关。 检查 groupBy 文档。它指出:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

默认情况下,maxConcurrency (flatMap)设置为256(我查看了3.2.2的源代码)。所以, 选择超过 256 个组可能会导致执行挂起(尤其是当所有执行都发生在同一线程上时)。

以下代码有助于理解当您链接运算符 groupBy 和 flatMap 时会发生什么:

@Test
public void groupAndFlatmapTest() {
    val groupCount = 257;
    val groupSize = 513;
    val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
    val source = Flux.fromIterable(list)
            .groupBy(i -> i % groupCount)
            .flatMap(Flux::collectList);
    StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}

此代码的执行挂起。将 groupCount 更改为 256 或更少会使测试通过(对于 groupSize 的每个值)。

因此,关于您的原始问题,您很可能正在使用您的键选择器创建大量组 Auction::getItem