使用 reactor 的 Flux.buffer 进行批处理仅适用于单个项目
Using reactor's Flux.buffer to batch work only works for single item
我正在尝试使用 Flux.buffer()
从数据库批量加载。
用例是从 DB 加载记录可能 'bursty',我想引入一个小缓冲区以尽可能将负载分组。
我的概念方法是使用某种形式的处理器,发布到它的接收器,让那个缓冲区,然后订阅和过滤我想要的结果。
我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的 Mono)。
以下是我到目前为止的进展 - 主要是由于磕磕绊绊。
目前,这 returns 是一个结果,但随后的调用被丢弃了(虽然我不确定在哪里)。
class BatchLoadingRepository {
// I've tried all manner of different processors here. I'm unsure if
// TopicProcessor is the correct one to use.
private val bufferPublisher = TopicProcessor.create<String>()
private val resultsStream = bufferPublisher
.bufferTimeout(50, Duration.ofMillis(50))
// I'm unsure if concatMapIterable is the correct operator here,
// but it seems to work.
// I'm really trying to turn the List<MyEntity>
// into a stream of MyEntity, published on the Flux<>
.concatMapIterable { requestedIds ->
// this is a Spring Data repository. It returns List<MyEntity>
repository.findAllById(requestedIds)
}
// Multiple callers will invoke this method, and then subscribe to receive
// their entity back.
fun findByIdAsync(id: String): Mono<MyEntity> {
// Is there a potential race condition here, caused by a result
// on the resultsStream, before I've subscribed?
return Mono.create<MyEntity> { sink ->
bufferPublisher.sink().next(id)
resultsStream.filter { it.id == id }
.subscribe { next ->
sink.success(next)
}
}
}
}
您好,我正在测试您的代码,我认为最好的方法是使用共享的 EmitterProcessor。我用 emitterProcessor 做了一个测试,它似乎有效。
Flux<String> fluxi;
EmitterProcessor emitterProcessor;
@Override
public void run(String... args) throws Exception {
emitterProcessor = EmitterProcessor.create();
fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500))
.concatMapIterable(o -> o);
Flux.range(0,1000)
.flatMap(integer -> findByIdAsync(integer.toString()))
.map(s -> {
System.out.println(s);
return s;
}).subscribe();
}
private Mono<String> findByIdAsync(String id) {
return Mono.create(monoSink -> {
fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value));
emitterProcessor.onNext(id);
});
}
我正在尝试使用 Flux.buffer()
从数据库批量加载。
用例是从 DB 加载记录可能 'bursty',我想引入一个小缓冲区以尽可能将负载分组。
我的概念方法是使用某种形式的处理器,发布到它的接收器,让那个缓冲区,然后订阅和过滤我想要的结果。
我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的 Mono)。
以下是我到目前为止的进展 - 主要是由于磕磕绊绊。
目前,这 returns 是一个结果,但随后的调用被丢弃了(虽然我不确定在哪里)。
class BatchLoadingRepository {
// I've tried all manner of different processors here. I'm unsure if
// TopicProcessor is the correct one to use.
private val bufferPublisher = TopicProcessor.create<String>()
private val resultsStream = bufferPublisher
.bufferTimeout(50, Duration.ofMillis(50))
// I'm unsure if concatMapIterable is the correct operator here,
// but it seems to work.
// I'm really trying to turn the List<MyEntity>
// into a stream of MyEntity, published on the Flux<>
.concatMapIterable { requestedIds ->
// this is a Spring Data repository. It returns List<MyEntity>
repository.findAllById(requestedIds)
}
// Multiple callers will invoke this method, and then subscribe to receive
// their entity back.
fun findByIdAsync(id: String): Mono<MyEntity> {
// Is there a potential race condition here, caused by a result
// on the resultsStream, before I've subscribed?
return Mono.create<MyEntity> { sink ->
bufferPublisher.sink().next(id)
resultsStream.filter { it.id == id }
.subscribe { next ->
sink.success(next)
}
}
}
}
您好,我正在测试您的代码,我认为最好的方法是使用共享的 EmitterProcessor。我用 emitterProcessor 做了一个测试,它似乎有效。
Flux<String> fluxi;
EmitterProcessor emitterProcessor;
@Override
public void run(String... args) throws Exception {
emitterProcessor = EmitterProcessor.create();
fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500))
.concatMapIterable(o -> o);
Flux.range(0,1000)
.flatMap(integer -> findByIdAsync(integer.toString()))
.map(s -> {
System.out.println(s);
return s;
}).subscribe();
}
private Mono<String> findByIdAsync(String id) {
return Mono.create(monoSink -> {
fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value));
emitterProcessor.onNext(id);
});
}