尝试汇总分组值时减少挂起
Reduce hanging when trying to sum up grouped values
我正在尝试使用 Project Reactor 链设置来收集和分组值,以最终按组汇总它们。该集合分为两个部分并分块。
在一个简化的示例中,我能够重现该问题。首先,我在 createWrappers() 中收集了一些通用数据,这些数据从网络读取数据(阻塞调用)。当数据被检索时,对象被发出。在第二步中,从不同的阻塞网络位置收集详细信息,并将该信息添加到包装器部分。然后将数据转换为详细信息列表,按详细信息键分组,最后按详细信息键汇总。最后应该生成一个看起来像这样的地图(值是特定于测试用例的):
key value
------------------
detail-0 1000
detail-1 2000
detail-2 3000
...
只要我将 block() 添加到 reduce() 部分,所有内容都会挂在下面的示例代码中:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestBlockingIssue
{
@Test
public void testBlockingMap()
{
final Flux<Wrapper> source = Flux.create( sink -> createWrappers( 1000, sink ) );
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.collectMap( group -> group.key(), group -> group.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).block() ).block();
System.out.println( block );
}
private Wrapper enhanceWrapper( final Wrapper wrapper, final int count )
{
for ( int i = 0; i < count; i++ )
{
wrapper.detailsList.add( new Details( "detail-" + i, new BigDecimal( i +1 ) ) );
}
return wrapper;
}
private void createWrappers( final int count, final FluxSink<Wrapper> sink )
{
for ( int i = 0; i < count; i++ )
{
sink.next( new Wrapper( "Wrapper-" + i ) );
}
sink.complete();
}
private class Details
{
final String detailKey;
final BigDecimal value;
private Details( final String detailKey, final BigDecimal value )
{
this.detailKey = detailKey;
this.value = value;
}
}
private class Wrapper
{
final String lookupKey;
final List<Details> detailsList = new ArrayList<>();
private Wrapper( final String lookupKey )
{
this.lookupKey = lookupKey;
}
}
}
我怎样才能解决挂链的问题,或者我必须使用哪些替代方法来生成地图?
当对太多组使用 groupBy 并且下游速度不足以消耗组时,会发生这种情况。在您的示例中,您不应该在收集地图中阻塞,但您应该在收集之前使用该组,例如:
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.flatMap(g -> g.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).map(v -> Tuples.of(g.key(), v)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block();
所以现在下游已经足够快了,但是你可能需要根据组的数量调整并发。并确保您的群组数量较少。
我正在尝试使用 Project Reactor 链设置来收集和分组值,以最终按组汇总它们。该集合分为两个部分并分块。
在一个简化的示例中,我能够重现该问题。首先,我在 createWrappers() 中收集了一些通用数据,这些数据从网络读取数据(阻塞调用)。当数据被检索时,对象被发出。在第二步中,从不同的阻塞网络位置收集详细信息,并将该信息添加到包装器部分。然后将数据转换为详细信息列表,按详细信息键分组,最后按详细信息键汇总。最后应该生成一个看起来像这样的地图(值是特定于测试用例的):
key value
------------------
detail-0 1000
detail-1 2000
detail-2 3000
...
只要我将 block() 添加到 reduce() 部分,所有内容都会挂在下面的示例代码中:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestBlockingIssue
{
@Test
public void testBlockingMap()
{
final Flux<Wrapper> source = Flux.create( sink -> createWrappers( 1000, sink ) );
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.collectMap( group -> group.key(), group -> group.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).block() ).block();
System.out.println( block );
}
private Wrapper enhanceWrapper( final Wrapper wrapper, final int count )
{
for ( int i = 0; i < count; i++ )
{
wrapper.detailsList.add( new Details( "detail-" + i, new BigDecimal( i +1 ) ) );
}
return wrapper;
}
private void createWrappers( final int count, final FluxSink<Wrapper> sink )
{
for ( int i = 0; i < count; i++ )
{
sink.next( new Wrapper( "Wrapper-" + i ) );
}
sink.complete();
}
private class Details
{
final String detailKey;
final BigDecimal value;
private Details( final String detailKey, final BigDecimal value )
{
this.detailKey = detailKey;
this.value = value;
}
}
private class Wrapper
{
final String lookupKey;
final List<Details> detailsList = new ArrayList<>();
private Wrapper( final String lookupKey )
{
this.lookupKey = lookupKey;
}
}
}
我怎样才能解决挂链的问题,或者我必须使用哪些替代方法来生成地图?
当对太多组使用 groupBy 并且下游速度不足以消耗组时,会发生这种情况。在您的示例中,您不应该在收集地图中阻塞,但您应该在收集之前使用该组,例如:
final Map<String, BigDecimal> block = source.parallel( 10 ).runOn( Schedulers.boundedElastic() )
.map( wrapper -> enhanceWrapper( wrapper, 100 ) )
.flatMap( wrapper -> Flux.fromIterable( wrapper.detailsList ) )
.sequential()
.groupBy( details -> details.detailKey )
.cache()
.flatMap(g -> g.reduce( new BigDecimal( 0 ), ( x, y ) -> x.add( y.value ) ).map(v -> Tuples.of(g.key(), v)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block();
所以现在下游已经足够快了,但是你可能需要根据组的数量调整并发。并确保您的群组数量较少。