如何在 Flux 中迭代一个对象并对其进行操作?
How to iterate an object inside a Flux and do an operation on it?
我正在使用项目反应器,我想执行以下操作:
@Override
public void run(ApplicationArguments args) {
Flux.from(KafkaReceiver.create(receiverOptions)
.receive()
.map(this::getObject)
.flatMap(this::iterateElasticWrites)
.flatMap(this::writeTheWholeObjectToS3)
).subscribe();
}
// What I'd like to do - but a non reactive code
private Publisher<MyObj> iterateElasticWrites(MyObj message) {
for (MyDoc file: message.getDocs()) {
writeElasticDoc(file.getText());
}
return Mono.just(message);
}
我正在努力寻找 Project Reactor 中 iterateElasticWrites
的等价物。我想对我的对象 (MyObj
) 执行一次迭代,并将其每个文档列表的元素反应性地写入 elasticsearch。
在 Reactor 中,您总是需要使用不同的运算符构建反应流,并且所有 reactive/async 代码都应该 return Mono
或 Flux
.
看看你的例子,它可能看起来像
private Mono<MyObj> iterateElasticWrites(MyObj message) {
return Flux.fromIterable(message.getDocs())
.flatMap(doc -> writeElasticDoc(doc.getText()))
.then(Mono.just(message));
}
其中 writeElasticDoc
可以定义为
private Mono<Void> writeElasticDoc(String text) {
...
}
我正在使用项目反应器,我想执行以下操作:
@Override
public void run(ApplicationArguments args) {
Flux.from(KafkaReceiver.create(receiverOptions)
.receive()
.map(this::getObject)
.flatMap(this::iterateElasticWrites)
.flatMap(this::writeTheWholeObjectToS3)
).subscribe();
}
// What I'd like to do - but a non reactive code
private Publisher<MyObj> iterateElasticWrites(MyObj message) {
for (MyDoc file: message.getDocs()) {
writeElasticDoc(file.getText());
}
return Mono.just(message);
}
我正在努力寻找 Project Reactor 中 iterateElasticWrites
的等价物。我想对我的对象 (MyObj
) 执行一次迭代,并将其每个文档列表的元素反应性地写入 elasticsearch。
在 Reactor 中,您总是需要使用不同的运算符构建反应流,并且所有 reactive/async 代码都应该 return Mono
或 Flux
.
看看你的例子,它可能看起来像
private Mono<MyObj> iterateElasticWrites(MyObj message) {
return Flux.fromIterable(message.getDocs())
.flatMap(doc -> writeElasticDoc(doc.getText()))
.then(Mono.just(message));
}
其中 writeElasticDoc
可以定义为
private Mono<Void> writeElasticDoc(String text) {
...
}