使用 Reactive Mongo 和 Web 客户端的非阻塞功能方法

Non-blocking functional methods with Reactive Mongo and Web client

我有一个微服务,它使用 ReactiveMongoRepository 接口从数据库中读取对象。

目标是获取这些对象中的每一个并将其推送到 AWS Lambda 函数(在将其转换为 DTO 之后)。如果该 lambda 函数的结果在 200 范围内,则将该对象标记为成功,否则忽略。

在过去的简单 Mongo 存储库和 RestTemplate 中,这将是一项微不足道的任务。但是,我正在尝试了解此 Reactive 交易,并避免阻塞。

这是我想出的代码,我知道我在 webClient 上阻塞了,但我该如何避免呢?

@Override
public Flux<Video> index() {
    return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        // Blocking call
        final HttpStatus httpStatus = webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO)).exchange()
                .block()
                .statusCode();

        if (httpStatus.is2xxSuccessful()) {
            video.setIndexed(true);
        }

        return videoRepository.save(video);
    });
}

我是从计划任务中调用上面的代码,我并不关心 index() 方法的实际结果,只关心期间发生的事情。

@Scheduled(fixedDelay = 60000)
public void indexTask() {
    indexService
            .index()
            .log()
            .subscribe();
}

我已经阅读了很多关于该主题的博客文章等,但它们都只是简单的 CRUD 操作,中间没有发生任何事情,所以不要真正让我全面了解如何实现这些东西。

有什么帮助吗?

您的解决方案实际上非常接近。 在这些情况下,您应该尝试逐步分解反应链,并且为了清楚起见毫不犹豫地将位转换为独立的方法。

@Override
public Flux<Video> index() {

    Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
    return unindexedVideos.flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        Mono<ClientResponse> indexedResponse = webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO)).exchange()
            .filter(res -> res.statusCode().is2xxSuccessful());

        return indexedResponse.flatMap(response -> {
            video.setIndexed(true);
            return videoRepository.save(video);
        });
    });

我的方法,可能更具可读性。但我承认我没有 运行 它所以不能 100% 保证它会起作用。

public Flux<Video> index() {
    return videoRepository.findAll()
        .flatMap(this::callLambda)
        .flatMap(videoRepository::save);
}

private Mono<Video> callLambda(final Video video) {
    SearchDTO searchDTO = new SearchDTO(video);
    return webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO))
            .exchange()
            .map(ClientResponse::statusCode)
            .filter(HttpStatus::is2xxSuccessful)
            .map(t -> {
                video.setIndexed(true);
                return video;
            });
}