如何访问Akka Stream的计算结果?

How to access the computation result of an Akka Stream?

我正在尝试 return 流操作的结果,在本例中是:

  1. 总结一个列表
  2. 平方值
  3. 平方值

表示为:

        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)

访问我使用的值

final AtomicInteger returnValue = new AtomicInteger();

其次是:

        .to(Sink.foreach(x -> {
            returnValue.set(x);
            System.out.println("got: " + x);
        }))

需要阻塞调用以允许流完成,这是不可接受的:

Thread.sleep(2000);

如果我使用:

    CompletableFuture<Object> futureValue =
            ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
    System.out.println(futureValue.toCompletableFuture().get().toString());

错误是 returned :

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

在这种情况下,收件人演员是来源,return在 Done.done 消息中包含以下内容:

return Optional.of(CompletionStrategy.immediately());

可以使用 Akka 流来 return 来自流的计算值吗?唯一的选择是将计算值存储在数据库中,或者在计算值时将其发送到 Kafka 主题:

.to(Sink.foreach(x -> {

?

完整的源代码:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class GetStreamValue {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String args[]) throws InterruptedException, ExecutionException {


        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            }
                            else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        final AtomicInteger returnValue = new AtomicInteger();

        final ActorRef actorRef = source
                .fold(0, (aggr, next) -> aggr + next)
                .map(x -> x * x)
                .map(x -> x * x)
                .to(Sink.foreach(x -> {
                    returnValue.set(x);
                    System.out.println("got: " + x);
                }))
                .run(system);

        Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        actorRef.tell(Done.done(), ActorRef.noSender());

        Thread.sleep(2000);

        System.out.println("returnValue is "+returnValue);

    }
}

我认为您可能缺少了解 Akka Streams 中 物化值 的概念。扫描 this part of the documentation especially around combining materialized values. I also had a go at trying to explain this concept here(搜索 物化值 )。如果你理解物化价值,那么也许我在这里写的会更有意义。

调用 Source.actorRef(..) returns Source<T, ActorRef> 其中 T 是流经流的元素的数据类型(在您的例子中是 Integer)和 ActorRef 是那个 Source 物化值 。当您在 RunnableGraph 上调用 run 时,您会同步获得物化值,这就是 to(...) 调用 returns.

ActorRef 是您可以根据 Source.actorRef(...) 语义“驱动”流的方式。

现在的问题是如何获取通过流的数据。在您的情况下,您将所有 Integers 缩减为一个,因此您可以使用 Sink.head 而不是使用对副作用有益的 Sink.foreach(...)。你看,Sinks 也可以产生 物化值 并且在 Sink.head 的情况下它物化为流中第一个元素的 CompletionStage,它在你的情况下是唯一的元素。那么让我们试试看:

final ActorRef actorRef = source
                                .fold(0, (aggr, next) -> aggr + next)
                                .map(x -> x * x)
                                .map(x -> x * x)
                                .to(Sink.head())
                                .run(system);

好吧,这并没有太大帮助。您仍然只获得 Source 的物化值。要获得 Sink 的物化值,我们需要明确要求它:

final Pair<ActorRef, CompletionStage<Integer>> matVals =
      source
        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)
        .toMat(Sink.head(), Keep.both())
        .run(system);

现在我们得到了 SourceSink 的物化值。您可以像以前一样通过 ActorRef 驱动您的流:

final ActorRef actorRef = matVals.first();

Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());

您还可以使用 CompletableStage API 从流中获取您的价值。像这样说:

Integer folded = matVals.second().toCompletableFuture().join(); 

是的,这是阻塞的,但您需要以某种方式阻止主线程在流运行完成之前完成。