将数据发送到 Akka Stream

Sending data to an Akka Stream

我正在尝试将 ArrayList 发送到 Akka 流进行处理。流处理列表如下:

1. Sum the list
2. Square the result of the sum. 

我已经定义了下面的代码来尝试实现这个结果:

public class SumListStream {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        ActorRef actorRef = source
                        .fold(0, (aggr, next) -> aggr + next)
                        .map(x -> x * x)
                        .to(Sink.foreach(x -> System.out.println("got: " + x)))
                        .run(system);

        actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender());

        actorRef.tell(
                new akka.actor.Status.Success(CompletionStrategy.draining()), ActorRef.noSender());

    }
}

使用 actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender()); 向流发送数据似乎没有任何影响,因为执行上述代码的结果是:

10:08:25.896 [StreamsExamples-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started

我是否正确实现了源代码?

我认为第一个错误是您将列表发送给期望 Integer 的演员,根据来源输入。一次发送一个整数,例如:

Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));

然后,fold 在发出结果之前等待源终止。您将 actor 设置为期望 Done 终止,但您正在发送它 Status.Success。尝试:

actorRef.tell(Done.done(), ActorRef.noSender());

我不是 Akka 专家,但通过这 2 处修改,您的代码会产生预期结果 (1 + 2 + 3)^2 = 36。您永远不会停止 actor 系统,因此程序会永远运行, 但原则是有的。

我不确定您的确切要求,但这是在 Akka Streams:

中流式传输列表的一种更简单的方法

Source.from(Arrays.asList(1,2,3))