Project Reactor:并行执行缓冲区

Project Reactor: buffer with parallel execution

我需要将日期从一个来源(并行)批量复制到另一个来源。

我这样做了:

 Flux.generate((SynchronousSink<String> sink) -> {
                    try {
                        String val = dataSource.getNextItem();
                        if (val == null) {
                            sink.complete();
                            return;
                        }
                        sink.next(val);

                    } catch (InterruptedException e) {
                        sink.error(e);
                    }
                })
                .parallel(4)
                .runOn(Schedulers.parallel())
                .doOnNext(dataTarget::write)
                .sequential()
                .blockLast();
class dataSource{
  public Item getNextItem(){ 
    //...
  }
}
class dataTarget{
  public void write(List<Item> items){ 
    //...
  }
}

它并行接收数据,但一次写入一个。

我需要分批收集它们(比如按 10 项),然后分批编写。

我该怎么做?

更新:

主要思想是源是消息系统(即 rabbitmq 或 nats),适合一条一条地高效发送消息,但目标是数据库,插入批处理效率更高。

所以最终结果应该是这样的——我并行接收消息直到缓冲区没有填满,然后我将所有缓冲区一次性写入数据库。

在常规 java 中很容易做到,但在流的情况下 — 我不知道如何去做。如何缓冲数据以及如何暂停 reader 直到编写器还没有准备好获取下一部分。

您需要在单独的 Publisher-s 中完成繁重的工作,这些工作将在 flatMap() 中并行实现。像这样

Flux.generate((SynchronousSink<String> sink) -> {
    try {
        String val = dataSource.getNextItem();
        if (val == null) {
            sink.complete();
            return;
        }
        sink.next(val);

    } catch (InterruptedException e) {
        sink.error(e);
    }
})
.parallel(4)
.runOn(Schedulers.parallel())
.flatMap(item -> Mono.fromCallable(() -> dataTarget.write(item)))
.sequential()
.blockLast();

您只需要 Flux#buffer(int maxSize) 操作员:

Flux.generate((SynchronousSink<String> sink) -> {
        try {
            String val = dataSource.getNextItem();
            if (val == null) {
                sink.complete();
                return;
            }
            sink.next(val);

        } catch (InterruptedException e) {
            sink.error(e);
        }
    })
    .buffer(10) //Flux<List<String>>
    .flatMap(dataTarget::write)
    .blockLast();

class DataTarget{
    public Mono<Void> write(List<String> items){
         return reactiveDbClient.insert(items);
    }
}

在这里,buffer 将项目收集到多个 List 的 10 个项目(批次)中。您不需要使用并行调度程序。 flatmap 将运行 这些操作异步进行。参见 Understanding Reactive’s .flatMap() Operator

最佳方法(从算法的角度来看)是拥有环形缓冲区并使用微批处理技术。写入 ringbuffer 是从 rabbitmq 一个接一个(或多个并行)完成的。读取线程(仅单个)将立即获取所有消息(在批处理开始时显示),将它们插入数据库并再次执行...一次全部意味着单个消息(如果只有一个)或一堆它们(如果它们是在上次插入的持续时间足够长时累积的)。

此技术也用于 jdbc(如果我没记错的话)并且可以使用 java 中的 lmax disruptor 库轻松实现。

示例项目(使用 ractor /Flux/ 和 System.out.println)可以在 https://github.com/luvarqpp/reactorBatch

上找到

核心代码:

    final Flux<String> stringFlux = Flux.interval(Duration.ofMillis(1)).map(x -> "Msg number " + x);

    final Flux<List<String>> stringFluxMicrobatched = stringFlux
            .bufferTimeout(100, Duration.ofNanos(1));

    stringFluxMicrobatched.subscribe(strings -> {
        // Batch insert into DB
        System.out.print("Inserting in batch " + strings.size() + " strings.");
        try {
            // Inserting into db is simulated by 10 to 40 ms sleep here...
            Thread.sleep(rnd.nextInt(30) + 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(" ... Done");
    });

欢迎使用技术名称和参考文献编辑和改进此 post。这是社区维基...