尝试汇总分组值时减少挂起

Reduce hanging when trying to sum up grouped values

我正在尝试使用 Project Reactor 链设置来收集和分组值,以最终按组汇总它们。该集合分为两个部分并分块。

在一个简化的示例中,我能够重现该问题。首先,我在 createWrappers() 中收集了一些通用数据,这些数据从网络读取数据(阻塞调用)。当数据被检索时,对象被发出。在第二步中,从不同的阻塞网络位置收集详细信息,并将该信息添加到包装器部分。然后将数据转换为详细信息列表,按详细信息键分组,最后按详细信息键汇总。最后应该生成一个看起来像这样的地图(值是特定于测试用例的):

key         value
------------------
detail-0    1000
detail-1    2000
detail-2    3000
...

只要我将 block() 添加到 reduce() 部分,所有内容都会挂在下面的示例代码中:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TestBlockingIssue
{
    @Test
    public void testBlockingMap()
    {
        final Flux<Wrapper> source = Flux.create( sink -> createWrappers( 1000, sink ) );

        final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
                .map( wrapper -> enhanceWrapper( wrapper, 100 ) )
                .flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
                .sequential()
                .groupBy( details -> details.detailKey )
                .cache()
                .collectMap( group -> group.key(), group -> group.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).block() ).block();

        System.out.println( block );
    }

    private Wrapper enhanceWrapper( final Wrapper wrapper, final int count )
    {
        for ( int i = 0; i < count; i++ )
        {
            wrapper.detailsList.add( new Details( "detail-" + i, new BigDecimal( i +1 ) ) );
        }
        return wrapper;
    }

    private void createWrappers( final int count, final FluxSink<Wrapper> sink )
    {
        for ( int i = 0; i < count; i++ )
        {
            sink.next( new Wrapper( "Wrapper-" + i ) );
        }
        sink.complete();
    }

    private class Details
    {
        final String detailKey;

        final BigDecimal value;

        private Details( final String detailKey, final BigDecimal value )
        {
            this.detailKey = detailKey;
            this.value = value;
        }
    }

    private class Wrapper
    {
        final String lookupKey;

        final List<Details> detailsList = new ArrayList<>();

        private Wrapper( final String lookupKey )
        {
            this.lookupKey = lookupKey;
        }
    }
}

我怎样才能解决挂链的问题,或者我必须使用哪些替代方法来生成地图?

当对太多组使用 groupBy 并且下游速度不足以消耗组时,会发生这种情况。在您的示例中,您不应该在收集地图中阻塞,但您应该在收集之前使用该组,例如:

final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
    .map( wrapper -> enhanceWrapper( wrapper, 100 ) )
    .flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
    .sequential()
    .groupBy( details -> details.detailKey )
    .cache()
    .flatMap(g -> g.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).map(v -> Tuples.of(g.key(), v)))
    .collectMap(Tuple2::getT1, Tuple2::getT2)
    .block();

所以现在下游已经足够快了,但是你可能需要根据组的数量调整并发。并确保您的群组数量较少。