如何使用 Akka 组合 Source.repeat 和 Source.completionStage
How to combine Source.repeat and Source.completionStage using Akka
我正在使用带有微服务框架的 akka,所以我收到了很多 completionStage 请求。我想从一个微服务中获取一个元素列表,并将它们与另一个微服务中的单个元素压缩在一起,这样我最终得到一个 Source of Pair.
我无法使用普通 zip 执行此操作,因为 Source.zip 在两个源之一完成后立即完成,所以我最终只传播了一个元素。
我不能使用 Source.zipAll,因为这需要我提前定义默认元素。
如果我已经提前有了单个元素,我可以使用 Source.repeat 让它重复传播那个元素,这意味着 Source.zip 会在元素列表完成时完成,但 Source.repeat 无法完成阶段或 Source.completionStage.
我目前的策略是在 mapConcat 列表元素之前将所有内容压缩在一起。
Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());
return Source.completionStage(anotherService.getListOfElements().invoke)
.zip(singleElement)
.flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));
这最终达到了我想要的效果,但我觉得有很多不必要的重复和同步移动数据。有没有更好的方法来解决我所缺少的这个问题?
你为什么不合并 CompletionStage
然后将它们提供给 Akka 流?
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
完整的 PoC - 使用睡眠超时来完成一个或另一个 CompletionStage
首先:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class CompletionStages {
CompletionStage<String> getSingleElement() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "Single Element";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
CompletionStage<List<String>> getListOfElements() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return Arrays.asList("One", "Two", "Three");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
CompletionStage<Done> run(ActorSystem system) {
return execute().runWith(Sink.foreach(System.out::println), system);
}
public static void main(String... args) {
final ActorSystem system = ActorSystem.create();
new CompletionStages().run(system)
.thenRun(system::terminate);
}
}
flatMapConcat
运算符应该允许您构造一个 Source.repeat
,它会在知道单个元素后重复该元素。在 Scala 中(Source.future
相当于 Source.completionStage
的 Scala:我对 Java lambda 语法不够熟悉,无法在 Java 中回答):
val singleElement = Source.future(oneService.getSingleElement)
Source.future(anotherService.getListOfElements)
.mapConcat(lst => lst) // unspool the list
.zip(singleElement.flatMapConcat(element => Source.repeat(element)))
我正在使用带有微服务框架的 akka,所以我收到了很多 completionStage 请求。我想从一个微服务中获取一个元素列表,并将它们与另一个微服务中的单个元素压缩在一起,这样我最终得到一个 Source of Pair.
我无法使用普通 zip 执行此操作,因为 Source.zip 在两个源之一完成后立即完成,所以我最终只传播了一个元素。
我不能使用 Source.zipAll,因为这需要我提前定义默认元素。
如果我已经提前有了单个元素,我可以使用 Source.repeat 让它重复传播那个元素,这意味着 Source.zip 会在元素列表完成时完成,但 Source.repeat 无法完成阶段或 Source.completionStage.
我目前的策略是在 mapConcat 列表元素之前将所有内容压缩在一起。
Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());
return Source.completionStage(anotherService.getListOfElements().invoke)
.zip(singleElement)
.flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));
这最终达到了我想要的效果,但我觉得有很多不必要的重复和同步移动数据。有没有更好的方法来解决我所缺少的这个问题?
你为什么不合并 CompletionStage
然后将它们提供给 Akka 流?
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
完整的 PoC - 使用睡眠超时来完成一个或另一个 CompletionStage
首先:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class CompletionStages {
CompletionStage<String> getSingleElement() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "Single Element";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
CompletionStage<List<String>> getListOfElements() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return Arrays.asList("One", "Two", "Three");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
CompletionStage<Done> run(ActorSystem system) {
return execute().runWith(Sink.foreach(System.out::println), system);
}
public static void main(String... args) {
final ActorSystem system = ActorSystem.create();
new CompletionStages().run(system)
.thenRun(system::terminate);
}
}
flatMapConcat
运算符应该允许您构造一个 Source.repeat
,它会在知道单个元素后重复该元素。在 Scala 中(Source.future
相当于 Source.completionStage
的 Scala:我对 Java lambda 语法不够熟悉,无法在 Java 中回答):
val singleElement = Source.future(oneService.getSingleElement)
Source.future(anotherService.getListOfElements)
.mapConcat(lst => lst) // unspool the list
.zip(singleElement.flatMapConcat(element => Source.repeat(element)))