如何访问Akka Stream的计算结果?
How to access the computation result of an Akka Stream?
我正在尝试 return 流操作的结果,在本例中是:
- 总结一个列表
- 平方值
- 平方值
表示为:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
访问我使用的值
final AtomicInteger returnValue = new AtomicInteger();
其次是:
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
需要阻塞调用以允许流完成,这是不可接受的:
Thread.sleep(2000);
如果我使用:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
错误是 returned :
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
在这种情况下,收件人演员是来源,return在 Done.done
消息中包含以下内容:
return Optional.of(CompletionStrategy.immediately());
可以使用 Akka 流来 return 来自流的计算值吗?唯一的选择是将计算值存储在数据库中,或者在计算值时将其发送到 Kafka 主题:
.to(Sink.foreach(x -> {
?
完整的源代码:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
我认为您可能缺少了解 Akka Streams 中 物化值 的概念。扫描 this part of the documentation especially around combining materialized values. I also had a go at trying to explain this concept here(搜索 物化值 )。如果你理解物化价值,那么也许我在这里写的会更有意义。
调用 Source.actorRef(..)
returns Source<T, ActorRef>
其中 T 是流经流的元素的数据类型(在您的例子中是 Integer
)和 ActorRef
是那个 Source
的 物化值 。当您在 RunnableGraph
上调用 run
时,您会同步获得物化值,这就是 to(...)
调用 returns.
ActorRef
是您可以根据 Source.actorRef(...)
语义“驱动”流的方式。
现在的问题是如何获取通过流的数据。在您的情况下,您将所有 Integers
缩减为一个,因此您可以使用 Sink.head 而不是使用对副作用有益的 Sink.foreach(...)
。你看,Sink
s 也可以产生 物化值 并且在 Sink.head
的情况下它物化为流中第一个元素的 CompletionStage
,它在你的情况下是唯一的元素。那么让我们试试看:
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.head())
.run(system);
好吧,这并没有太大帮助。您仍然只获得 Source
的物化值。要获得 Sink
的物化值,我们需要明确要求它:
final Pair<ActorRef, CompletionStage<Integer>> matVals =
source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.toMat(Sink.head(), Keep.both())
.run(system);
现在我们得到了 Source
和 Sink
的物化值。您可以像以前一样通过 ActorRef
驱动您的流:
final ActorRef actorRef = matVals.first();
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
您还可以使用 CompletableStage
API 从流中获取您的价值。像这样说:
Integer folded = matVals.second().toCompletableFuture().join();
是的,这是阻塞的,但您需要以某种方式阻止主线程在流运行完成之前完成。
我正在尝试 return 流操作的结果,在本例中是:
- 总结一个列表
- 平方值
- 平方值
表示为:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
访问我使用的值
final AtomicInteger returnValue = new AtomicInteger();
其次是:
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
需要阻塞调用以允许流完成,这是不可接受的:
Thread.sleep(2000);
如果我使用:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
错误是 returned :
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
在这种情况下,收件人演员是来源,return在 Done.done
消息中包含以下内容:
return Optional.of(CompletionStrategy.immediately());
可以使用 Akka 流来 return 来自流的计算值吗?唯一的选择是将计算值存储在数据库中,或者在计算值时将其发送到 Kafka 主题:
.to(Sink.foreach(x -> {
?
完整的源代码:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
我认为您可能缺少了解 Akka Streams 中 物化值 的概念。扫描 this part of the documentation especially around combining materialized values. I also had a go at trying to explain this concept here(搜索 物化值 )。如果你理解物化价值,那么也许我在这里写的会更有意义。
调用 Source.actorRef(..)
returns Source<T, ActorRef>
其中 T 是流经流的元素的数据类型(在您的例子中是 Integer
)和 ActorRef
是那个 Source
的 物化值 。当您在 RunnableGraph
上调用 run
时,您会同步获得物化值,这就是 to(...)
调用 returns.
ActorRef
是您可以根据 Source.actorRef(...)
语义“驱动”流的方式。
现在的问题是如何获取通过流的数据。在您的情况下,您将所有 Integers
缩减为一个,因此您可以使用 Sink.head 而不是使用对副作用有益的 Sink.foreach(...)
。你看,Sink
s 也可以产生 物化值 并且在 Sink.head
的情况下它物化为流中第一个元素的 CompletionStage
,它在你的情况下是唯一的元素。那么让我们试试看:
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.head())
.run(system);
好吧,这并没有太大帮助。您仍然只获得 Source
的物化值。要获得 Sink
的物化值,我们需要明确要求它:
final Pair<ActorRef, CompletionStage<Integer>> matVals =
source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.toMat(Sink.head(), Keep.both())
.run(system);
现在我们得到了 Source
和 Sink
的物化值。您可以像以前一样通过 ActorRef
驱动您的流:
final ActorRef actorRef = matVals.first();
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
您还可以使用 CompletableStage
API 从流中获取您的价值。像这样说:
Integer folded = matVals.second().toCompletableFuture().join();
是的,这是阻塞的,但您需要以某种方式阻止主线程在流运行完成之前完成。