Java 单声道重复调用直到收集的结果竞争

Java mono repeat call until collected results compete

我从 C# 转过来后开始学习 Java/Reactor。我精通 C# async-await 非阻塞调用方法,并且正在努力适应 Flux/Mono.

我正在实施一个解决方案,我需要通过 Java SDK 调用 ElasticSearch,获取结果,应用额外的过滤器以去除 ES 结果,并在 ES 中继续分页直到我最终收集的结果是完整的。

ES SDK 不支持 Reactor,但有一些 Java 适配器代码的示例采用 ES 回调并转换为单声道(我在这里看到与 C# async-await 的直接关联,因为是对 ES 的非阻塞调用)。然后我挣扎的是下一点 - 我需要从 ES mono 中获取结果,过滤它们。

我通过调用其他外部服务以根据 ES 调用的结果获取额外数据来执行此操作,因此在应用过滤之前,我需要知道 ES mono 结果的每一页内容的 ID (实际上是一种块),然后应用内存中的过滤器,如果我没有足够的内容,则返回 ES 获取下一页...重复直到我有足够的数据或没有更多结果来自 ES.

与 C# 相比,这似乎很难实现,但我可能只是没有正确理解 Java 范例。

我的问题是我不能使用“block()”,因为这会在 Reactor 3.2 中引发错误,所以我真的不知道如何“等待”直到对 ES 和外部服务的单声道调用完成直到继续。在 C# 中,这就像调用带有 await 的异步方法来处理隐式回调一样简单

我的阻塞版本(在 IntellJ 中工作,通过 maven 发布时失败,然后在网络服务器中 运行)有效:

do {
 var sr = GetSearchRequest(xxxx);
 this.elasticsearch.results(sr)
   .map(r -> chunk.addAdd(r))
   .block();

 if (chunk.size() == 0 {
   isComplete = true;
 }
 else {
  var filtered = postFilterResults(chunk);
  finalResults.add(filtered);
  if (finalResults.size() = MAXIMUM_RESULTS) {
    isComplete = true;
  }
  esPage = esPage + 1;
while (isComplete == false);

如果我尝试 subscribe() 或其他非阻塞 reaktor 调用,那么(显然)代码会跳过“get ES”位并点击 do-while,重复循环直到 ES 的回调最终发生并调用订阅的地图。

我想我需要为每个 ES 调用执行一个“异步块”,但我不知道如何。

回答我自己的问题... IMO 的根本问题是 Flux/Mono 根本不像任何现有的编程风格,因为它绝对迫使您在 reactor 要求的流畅风格下工作。这与 C# Linq 非常相似,但它几乎是一个“假朋友”,因为即使像循环这样的东西也需要在 Reactor 中。

在这种情况下,要解决的关键问题之一是分页并在循环中继续这样做。非常不清楚如何实现这一点,因为对通量的订阅“锁定”了原始参数,因此重复订阅调用只会再次获得相同的页面。解决方案是使用 Flux.defer 方法,该方法在每次重复调用 时强制延迟构建订阅 。然后,您需要原子整数来跟踪不同调用的页面计数器。同样,这是 C# 为您处理的事情,因此它可以赶上 .net 开发人员。

类似于:

//The response from the elasticsearch adapter is a Flux<T> but we do not want to filter
//results on a row by row basis as this incurs one call for each row to the DB/Network
//(as appropriate). We choose to batch these up

var result = new SearchResult();
var page = new AtomicInteger();
var chunkSize = new AtomicInteger();

//Use a defer so we recalculate the subscription to the search with the new page count
var results = Flux.defer(() -> elasticsearch.results(GetSearchRequest(request, lc, pf, page.get()))
       .doOnComplete(() -> {
           chunkSize.set(0);
           page.getAndAdd(1);
       })
       .collectList()
       .map(chunk -> {
           chunkSize.set(chunk.size());
           return chunk;
       })
       .map(chunk -> postFilterResults(request, chunk, pf))
       .map(filtered -> result.getDocuments().addAll(filtered)));


//Repeat the deferred flux (recalculating each time) until we have enough content or we don't get anything from the search engine
return results
        .repeat()
        .takeUntil(r -> chunkSize.get() == 0 || result.getDocuments().size() >= this.elasticsearch.getMaximumSearchResults())
        .take(this.elasticsearch.getMaximumSearchResults())
        .collectList()
        .flatMap(r -> {
            result.setTotalHits(result.getDocuments().size());
            return Mono.just(result);
        });