spring webflux 流完成消费者

spring webflux stream Completion Consumer

我有 spring webflux 流消费者,它调用 REST 端点并使用接收到的消息并保存到 RDBMS。我正试图找到一种方法来批处理它。我看到 subscribe() 有一个重载方法,它在完成时被调用。我试图找到如何在调用此完成使用者时获取数据,因为我正在调用类型为 Runnable 的 CompletionConsumer,而我所拥有的只是 运行() 方法,它不采用任何参数。

**CLIENT**

       WebClient.create("http://localhost:8080")
                .get()
                .uri("/objects")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .exchange()
                .flatMapMany(clientResponse ->clientResponse.bodyToFlux(MyObject.class))
               .subscribe(null,null,completionProcessorSubscriber);


**COMPLETION SUBSCRIBER**


@Service
public class CompletionProcessorSubscriber implements  Runnable{

    @Autowired
    LegacyDAOImpl dao;

    Logger logger = LoggerFactory.getLogger(CompletionProcessorSubscriber.class);


    public void run() {

        logger.info("\ninside RUNNNNNNNNN\n\n");
// here how to get hold of the data stream ?
    }

Below is the  Documentation from the Flux API

 */
    public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer) {
        return subscribe(consumer, errorConsumer, completeConsumer, null);
    }

您应该避免向订阅者方法添加太多逻辑。相反,您应该利用 Flux API.

提供的丰富的运算符集

在这种情况下,您需要的运算符是 buffer to collect batches and concatMap 以按顺序执行批处理。

在下面的示例中,我假设 LegacyDAOImpl 是一个阻塞服务,其工作应该分配给适当的线程池。

public static void main(String[] args) throws InterruptedException
{
    webClient.get()
             .uri("/objects")
             .accept(MediaType.TEXT_EVENT_STREAM)
             .exchange()
             .flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyObject.class))
             .buffer(100) // batch size
             .concatMap(batchOfMyObjects -> Mono.fromRunnable(() -> legacyDAOImpl.saveAll(batchOfMyObjects))
                                                .subscribeOn(Schedulers.elastic())) // blocking IO goes to elastic thread pool
             .subscribe();
}

private static class LegacyDAOImpl
{
    public void saveAll(List<MyObject> myObjects)
    {
        // save here
    }
}

private static class MyObject
{
}