spring webflux 流完成消费者
spring webflux stream Completion Consumer
我有 spring webflux 流消费者,它调用 REST 端点并使用接收到的消息并保存到 RDBMS。我正试图找到一种方法来批处理它。我看到 subscribe() 有一个重载方法,它在完成时被调用。我试图找到如何在调用此完成使用者时获取数据,因为我正在调用类型为 Runnable 的 CompletionConsumer,而我所拥有的只是 运行() 方法,它不采用任何参数。
**CLIENT**
WebClient.create("http://localhost:8080")
.get()
.uri("/objects")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(clientResponse ->clientResponse.bodyToFlux(MyObject.class))
.subscribe(null,null,completionProcessorSubscriber);
**COMPLETION SUBSCRIBER**
@Service
public class CompletionProcessorSubscriber implements Runnable{
@Autowired
LegacyDAOImpl dao;
Logger logger = LoggerFactory.getLogger(CompletionProcessorSubscriber.class);
public void run() {
logger.info("\ninside RUNNNNNNNNN\n\n");
// here how to get hold of the data stream ?
}
Below is the Documentation from the Flux API
*/
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, null);
}
您应该避免向订阅者方法添加太多逻辑。相反,您应该利用 Flux API.
提供的丰富的运算符集
在这种情况下,您需要的运算符是 buffer
to collect batches and concatMap
以按顺序执行批处理。
在下面的示例中,我假设 LegacyDAOImpl 是一个阻塞服务,其工作应该分配给适当的线程池。
public static void main(String[] args) throws InterruptedException
{
webClient.get()
.uri("/objects")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyObject.class))
.buffer(100) // batch size
.concatMap(batchOfMyObjects -> Mono.fromRunnable(() -> legacyDAOImpl.saveAll(batchOfMyObjects))
.subscribeOn(Schedulers.elastic())) // blocking IO goes to elastic thread pool
.subscribe();
}
private static class LegacyDAOImpl
{
public void saveAll(List<MyObject> myObjects)
{
// save here
}
}
private static class MyObject
{
}
我有 spring webflux 流消费者,它调用 REST 端点并使用接收到的消息并保存到 RDBMS。我正试图找到一种方法来批处理它。我看到 subscribe() 有一个重载方法,它在完成时被调用。我试图找到如何在调用此完成使用者时获取数据,因为我正在调用类型为 Runnable 的 CompletionConsumer,而我所拥有的只是 运行() 方法,它不采用任何参数。
**CLIENT**
WebClient.create("http://localhost:8080")
.get()
.uri("/objects")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(clientResponse ->clientResponse.bodyToFlux(MyObject.class))
.subscribe(null,null,completionProcessorSubscriber);
**COMPLETION SUBSCRIBER**
@Service
public class CompletionProcessorSubscriber implements Runnable{
@Autowired
LegacyDAOImpl dao;
Logger logger = LoggerFactory.getLogger(CompletionProcessorSubscriber.class);
public void run() {
logger.info("\ninside RUNNNNNNNNN\n\n");
// here how to get hold of the data stream ?
}
Below is the Documentation from the Flux API
*/
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, null);
}
您应该避免向订阅者方法添加太多逻辑。相反,您应该利用 Flux API.
提供的丰富的运算符集在这种情况下,您需要的运算符是 buffer
to collect batches and concatMap
以按顺序执行批处理。
在下面的示例中,我假设 LegacyDAOImpl 是一个阻塞服务,其工作应该分配给适当的线程池。
public static void main(String[] args) throws InterruptedException
{
webClient.get()
.uri("/objects")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyObject.class))
.buffer(100) // batch size
.concatMap(batchOfMyObjects -> Mono.fromRunnable(() -> legacyDAOImpl.saveAll(batchOfMyObjects))
.subscribeOn(Schedulers.elastic())) // blocking IO goes to elastic thread pool
.subscribe();
}
private static class LegacyDAOImpl
{
public void saveAll(List<MyObject> myObjects)
{
// save here
}
}
private static class MyObject
{
}