Project Reactor:并行执行缓冲区
Project Reactor: buffer with parallel execution
我需要将日期从一个来源(并行)批量复制到另一个来源。
我这样做了:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.doOnNext(dataTarget::write)
.sequential()
.blockLast();
class dataSource{
public Item getNextItem(){
//...
}
}
class dataTarget{
public void write(List<Item> items){
//...
}
}
它并行接收数据,但一次写入一个。
我需要分批收集它们(比如按 10 项),然后分批编写。
我该怎么做?
更新:
主要思想是源是消息系统(即 rabbitmq 或 nats),适合一条一条地高效发送消息,但目标是数据库,插入批处理效率更高。
所以最终结果应该是这样的——我并行接收消息直到缓冲区没有填满,然后我将所有缓冲区一次性写入数据库。
在常规 java 中很容易做到,但在流的情况下 — 我不知道如何去做。如何缓冲数据以及如何暂停 reader 直到编写器还没有准备好获取下一部分。
您需要在单独的 Publisher
-s 中完成繁重的工作,这些工作将在 flatMap() 中并行实现。像这样
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.flatMap(item -> Mono.fromCallable(() -> dataTarget.write(item)))
.sequential()
.blockLast();
您只需要 Flux#buffer(int maxSize)
操作员:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.buffer(10) //Flux<List<String>>
.flatMap(dataTarget::write)
.blockLast();
class DataTarget{
public Mono<Void> write(List<String> items){
return reactiveDbClient.insert(items);
}
}
在这里,buffer
将项目收集到多个 List
的 10 个项目(批次)中。您不需要使用并行调度程序。 flatmap
将运行 这些操作异步进行。参见 Understanding Reactive’s .flatMap() Operator。
最佳方法(从算法的角度来看)是拥有环形缓冲区并使用微批处理技术。写入 ringbuffer 是从 rabbitmq 一个接一个(或多个并行)完成的。读取线程(仅单个)将立即获取所有消息(在批处理开始时显示),将它们插入数据库并再次执行...一次全部意味着单个消息(如果只有一个)或一堆它们(如果它们是在上次插入的持续时间足够长时累积的)。
此技术也用于 jdbc(如果我没记错的话)并且可以使用 java 中的 lmax disruptor 库轻松实现。
示例项目(使用 ractor /Flux/ 和 System.out.println)可以在 https://github.com/luvarqpp/reactorBatch
上找到
核心代码:
final Flux<String> stringFlux = Flux.interval(Duration.ofMillis(1)).map(x -> "Msg number " + x);
final Flux<List<String>> stringFluxMicrobatched = stringFlux
.bufferTimeout(100, Duration.ofNanos(1));
stringFluxMicrobatched.subscribe(strings -> {
// Batch insert into DB
System.out.print("Inserting in batch " + strings.size() + " strings.");
try {
// Inserting into db is simulated by 10 to 40 ms sleep here...
Thread.sleep(rnd.nextInt(30) + 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" ... Done");
});
欢迎使用技术名称和参考文献编辑和改进此 post。这是社区维基...
我需要将日期从一个来源(并行)批量复制到另一个来源。
我这样做了:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.doOnNext(dataTarget::write)
.sequential()
.blockLast();
class dataSource{
public Item getNextItem(){
//...
}
}
class dataTarget{
public void write(List<Item> items){
//...
}
}
它并行接收数据,但一次写入一个。
我需要分批收集它们(比如按 10 项),然后分批编写。
我该怎么做?
更新:
主要思想是源是消息系统(即 rabbitmq 或 nats),适合一条一条地高效发送消息,但目标是数据库,插入批处理效率更高。
所以最终结果应该是这样的——我并行接收消息直到缓冲区没有填满,然后我将所有缓冲区一次性写入数据库。
在常规 java 中很容易做到,但在流的情况下 — 我不知道如何去做。如何缓冲数据以及如何暂停 reader 直到编写器还没有准备好获取下一部分。
您需要在单独的 Publisher
-s 中完成繁重的工作,这些工作将在 flatMap() 中并行实现。像这样
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.flatMap(item -> Mono.fromCallable(() -> dataTarget.write(item)))
.sequential()
.blockLast();
您只需要 Flux#buffer(int maxSize)
操作员:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.buffer(10) //Flux<List<String>>
.flatMap(dataTarget::write)
.blockLast();
class DataTarget{
public Mono<Void> write(List<String> items){
return reactiveDbClient.insert(items);
}
}
在这里,buffer
将项目收集到多个 List
的 10 个项目(批次)中。您不需要使用并行调度程序。 flatmap
将运行 这些操作异步进行。参见 Understanding Reactive’s .flatMap() Operator。
最佳方法(从算法的角度来看)是拥有环形缓冲区并使用微批处理技术。写入 ringbuffer 是从 rabbitmq 一个接一个(或多个并行)完成的。读取线程(仅单个)将立即获取所有消息(在批处理开始时显示),将它们插入数据库并再次执行...一次全部意味着单个消息(如果只有一个)或一堆它们(如果它们是在上次插入的持续时间足够长时累积的)。
此技术也用于 jdbc(如果我没记错的话)并且可以使用 java 中的 lmax disruptor 库轻松实现。
示例项目(使用 ractor /Flux/ 和 System.out.println)可以在 https://github.com/luvarqpp/reactorBatch
上找到核心代码:
final Flux<String> stringFlux = Flux.interval(Duration.ofMillis(1)).map(x -> "Msg number " + x);
final Flux<List<String>> stringFluxMicrobatched = stringFlux
.bufferTimeout(100, Duration.ofNanos(1));
stringFluxMicrobatched.subscribe(strings -> {
// Batch insert into DB
System.out.print("Inserting in batch " + strings.size() + " strings.");
try {
// Inserting into db is simulated by 10 to 40 ms sleep here...
Thread.sleep(rnd.nextInt(30) + 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" ... Done");
});
欢迎使用技术名称和参考文献编辑和改进此 post。这是社区维基...