使用 Temp Lookup Map 的反应式方法

Reactive Approach To Using Temp Lookup Map

我想使用 RxJava 或 Proj 使现有代码更具反应性。反应堆。我有点挣扎。

给定一个 CatalogRequest 列表,其中一些可能针对相同的 CatalogItem 但具有不同的条件(NEW 与不同级别的 USED),我想:

  1. 通过 itemId 获取不同的 CatalogItems。所有条件的数据都存储在同一行中,所以我只需要对同一项目进行一次请求。

  2. 一旦我有了不同的项目,浏览我的请求列表并执行一些逻辑,将这些请求转换为我刚刚查找的不同的 CatalogItem。

    class CatalogRequest {
        String itemId;
        Integer condition;
    }
    
    //class does not look like this, simplified for the question
    class CatalogItem {
         Map<Condition, Money> usedPrices;
         Map<Condition, DateTime> availableDate;
    }
    
    //non reactive code
    public List<CatalogResponse> fetchResponses(List<CatalogRequest> requests) {
    List<CatalogItem> items = requests.stream().map(CatalogRequest::getItemId)
        .distinct()
        .collect(toList());
    
    List<Optional<CatalogItem>> cachedResults = items.stream()
        .map (this::fetchItem)
        .map (CompletableFuture::join)
        .filter(Optional::isPresent)
        .collect(toList());
    
    List<CatalogResponse> responses = new ArrayList<>();
    requests.forEach( request -> 
        {
             CatalogResponse response = transformResponse(request, cachedResults.get(request.getItemId());
             responses.add(response);
        });
    }
    

恕我直言,您的算法和用例不适合响应式编程模式。这是因为它 "joins" 来自 2 个集合(请求和缓存的响应)的信息,并且只能在处理完所有数据后才能执行此操作,而不是可以逐项逐步完成的处理。

它甚至不能通过组合像 zip 这样的运算符轻松解决,因为数据被过滤并且不能保证以相同的顺序或基数出现。

(顺便说一下,我认为 问题 cachedResults.get(request.getItemId())列表的索引)。

所以转换 fetchResponses 的第一步很容易,但最后一步就不那么容易了。

作为参考,以下是我将如何迁移部分代码:

您可以将 fetchItem() 更改为 return a Mono(例如,使用 Mono.fromCallable(),具体取决于 fetch 的实际作用)。请注意,如果 fetch 在内部使用 ExecutorService,那么您需要确保您的 Mono 也在另一个线程上下文中类似地执行,例如使用 .subscribeOn(Schedulers.elastic())...

然后就可以开始迁移fetchResponses中的代码了。我不认为从 CatalogRequests 的集合开始是一个问题,如果你在没有任何特定延迟的情况下将它们收集在内存中。如果没有,您也可以随时迁移父代码以使用 Flux<CatalogRequest> 调用该方法并异步响应每个请求。

可以使用 Flux.fromIterable(requests) 将列表转换为 Flux,然后使用 .distinct() 过滤重复项。

这里有一个变化:先不要收集,但继续构建你的异步序列。

执行 .flatMap(this::fetchItem) 以异步检索行。您甚至可能不需要此方法来 return 和 Optional,因为空的 flux/mono 是等效的,并且会被 flatMap.

忽略

但在这一点上,您有点难以恢复到 .collectList().block() 并重新使用命令式代码。

如果您能找到一种声明式的方式来执行相同的任务(考虑使用单个 Stream),那么事情将更适合响应式编程。