使用 Temp Lookup Map 的反应式方法
Reactive Approach To Using Temp Lookup Map
我想使用 RxJava 或 Proj 使现有代码更具反应性。反应堆。我有点挣扎。
给定一个 CatalogRequest 列表,其中一些可能针对相同的 CatalogItem 但具有不同的条件(NEW 与不同级别的 USED),我想:
通过 itemId 获取不同的 CatalogItems。所有条件的数据都存储在同一行中,所以我只需要对同一项目进行一次请求。
一旦我有了不同的项目,浏览我的请求列表并执行一些逻辑,将这些请求转换为我刚刚查找的不同的 CatalogItem。
class CatalogRequest {
String itemId;
Integer condition;
}
//class does not look like this, simplified for the question
class CatalogItem {
Map<Condition, Money> usedPrices;
Map<Condition, DateTime> availableDate;
}
//non reactive code
public List<CatalogResponse> fetchResponses(List<CatalogRequest> requests) {
List<CatalogItem> items = requests.stream().map(CatalogRequest::getItemId)
.distinct()
.collect(toList());
List<Optional<CatalogItem>> cachedResults = items.stream()
.map (this::fetchItem)
.map (CompletableFuture::join)
.filter(Optional::isPresent)
.collect(toList());
List<CatalogResponse> responses = new ArrayList<>();
requests.forEach( request ->
{
CatalogResponse response = transformResponse(request, cachedResults.get(request.getItemId());
responses.add(response);
});
}
恕我直言,您的算法和用例不适合响应式编程模式。这是因为它 "joins" 来自 2 个集合(请求和缓存的响应)的信息,并且只能在处理完所有数据后才能执行此操作,而不是可以逐项逐步完成的处理。
它甚至不能通过组合像 zip
这样的运算符轻松解决,因为数据被过滤并且不能保证以相同的顺序或基数出现。
(顺便说一下,我认为 问题 与 cachedResults.get(request.getItemId())
列表的索引)。
所以转换 fetchResponses
的第一步很容易,但最后一步就不那么容易了。
作为参考,以下是我将如何迁移部分代码:
您可以将 fetchItem()
更改为 return a Mono
(例如,使用 Mono.fromCallable()
,具体取决于 fetch 的实际作用)。请注意,如果 fetch 在内部使用 ExecutorService
,那么您需要确保您的 Mono 也在另一个线程上下文中类似地执行,例如使用 .subscribeOn(Schedulers.elastic())
...
然后就可以开始迁移fetchResponses
中的代码了。我不认为从 CatalogRequests
的集合开始是一个问题,如果你在没有任何特定延迟的情况下将它们收集在内存中。如果没有,您也可以随时迁移父代码以使用 Flux<CatalogRequest>
调用该方法并异步响应每个请求。
可以使用 Flux.fromIterable(requests)
将列表转换为 Flux
,然后使用 .distinct()
过滤重复项。
这里有一个变化:先不要收集,但继续构建你的异步序列。
执行 .flatMap(this::fetchItem)
以异步检索行。您甚至可能不需要此方法来 return 和 Optional
,因为空的 flux/mono 是等效的,并且会被 flatMap
.
忽略
但在这一点上,您有点难以恢复到 .collectList().block()
并重新使用命令式代码。
如果您能找到一种声明式的方式来执行相同的任务(考虑使用单个 Stream
),那么事情将更适合响应式编程。
我想使用 RxJava 或 Proj 使现有代码更具反应性。反应堆。我有点挣扎。
给定一个 CatalogRequest 列表,其中一些可能针对相同的 CatalogItem 但具有不同的条件(NEW 与不同级别的 USED),我想:
通过 itemId 获取不同的 CatalogItems。所有条件的数据都存储在同一行中,所以我只需要对同一项目进行一次请求。
一旦我有了不同的项目,浏览我的请求列表并执行一些逻辑,将这些请求转换为我刚刚查找的不同的 CatalogItem。
class CatalogRequest { String itemId; Integer condition; } //class does not look like this, simplified for the question class CatalogItem { Map<Condition, Money> usedPrices; Map<Condition, DateTime> availableDate; } //non reactive code public List<CatalogResponse> fetchResponses(List<CatalogRequest> requests) { List<CatalogItem> items = requests.stream().map(CatalogRequest::getItemId) .distinct() .collect(toList()); List<Optional<CatalogItem>> cachedResults = items.stream() .map (this::fetchItem) .map (CompletableFuture::join) .filter(Optional::isPresent) .collect(toList()); List<CatalogResponse> responses = new ArrayList<>(); requests.forEach( request -> { CatalogResponse response = transformResponse(request, cachedResults.get(request.getItemId()); responses.add(response); }); }
恕我直言,您的算法和用例不适合响应式编程模式。这是因为它 "joins" 来自 2 个集合(请求和缓存的响应)的信息,并且只能在处理完所有数据后才能执行此操作,而不是可以逐项逐步完成的处理。
它甚至不能通过组合像 zip
这样的运算符轻松解决,因为数据被过滤并且不能保证以相同的顺序或基数出现。
(顺便说一下,我认为 问题 与 cachedResults.get(request.getItemId())
列表的索引)。
所以转换 fetchResponses
的第一步很容易,但最后一步就不那么容易了。
作为参考,以下是我将如何迁移部分代码:
您可以将 fetchItem()
更改为 return a Mono
(例如,使用 Mono.fromCallable()
,具体取决于 fetch 的实际作用)。请注意,如果 fetch 在内部使用 ExecutorService
,那么您需要确保您的 Mono 也在另一个线程上下文中类似地执行,例如使用 .subscribeOn(Schedulers.elastic())
...
然后就可以开始迁移fetchResponses
中的代码了。我不认为从 CatalogRequests
的集合开始是一个问题,如果你在没有任何特定延迟的情况下将它们收集在内存中。如果没有,您也可以随时迁移父代码以使用 Flux<CatalogRequest>
调用该方法并异步响应每个请求。
可以使用 Flux.fromIterable(requests)
将列表转换为 Flux
,然后使用 .distinct()
过滤重复项。
这里有一个变化:先不要收集,但继续构建你的异步序列。
执行 .flatMap(this::fetchItem)
以异步检索行。您甚至可能不需要此方法来 return 和 Optional
,因为空的 flux/mono 是等效的,并且会被 flatMap
.
但在这一点上,您有点难以恢复到 .collectList().block()
并重新使用命令式代码。
如果您能找到一种声明式的方式来执行相同的任务(考虑使用单个 Stream
),那么事情将更适合响应式编程。