将数据发送到 Akka Stream
Sending data to an Akka Stream
我正在尝试将 ArrayList 发送到 Akka 流进行处理。流处理列表如下:
1. Sum the list
2. Square the result of the sum.
我已经定义了下面的代码来尝试实现这个结果:
public class SumListStream {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String[] args) throws ExecutionException, InterruptedException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.to(Sink.foreach(x -> System.out.println("got: " + x)))
.run(system);
actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender());
actorRef.tell(
new akka.actor.Status.Success(CompletionStrategy.draining()), ActorRef.noSender());
}
}
使用 actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender());
向流发送数据似乎没有任何影响,因为执行上述代码的结果是:
10:08:25.896 [StreamsExamples-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
我是否正确实现了源代码?
我认为第一个错误是您将列表发送给期望 Integer
的演员,根据来源输入。一次发送一个整数,例如:
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
然后,fold 在发出结果之前等待源终止。您将 actor 设置为期望 Done
终止,但您正在发送它 Status.Success
。尝试:
actorRef.tell(Done.done(), ActorRef.noSender());
我不是 Akka 专家,但通过这 2 处修改,您的代码会产生预期结果 (1 + 2 + 3)^2 = 36。您永远不会停止 actor 系统,因此程序会永远运行, 但原则是有的。
我不确定您的确切要求,但这是在 Akka Streams:
中流式传输列表的一种更简单的方法
Source.from(Arrays.asList(1,2,3))
我正在尝试将 ArrayList 发送到 Akka 流进行处理。流处理列表如下:
1. Sum the list
2. Square the result of the sum.
我已经定义了下面的代码来尝试实现这个结果:
public class SumListStream {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String[] args) throws ExecutionException, InterruptedException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.to(Sink.foreach(x -> System.out.println("got: " + x)))
.run(system);
actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender());
actorRef.tell(
new akka.actor.Status.Success(CompletionStrategy.draining()), ActorRef.noSender());
}
}
使用 actorRef.tell(Arrays.asList(1,2,3), ActorRef.noSender());
向流发送数据似乎没有任何影响,因为执行上述代码的结果是:
10:08:25.896 [StreamsExamples-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
我是否正确实现了源代码?
我认为第一个错误是您将列表发送给期望 Integer
的演员,根据来源输入。一次发送一个整数,例如:
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
然后,fold 在发出结果之前等待源终止。您将 actor 设置为期望 Done
终止,但您正在发送它 Status.Success
。尝试:
actorRef.tell(Done.done(), ActorRef.noSender());
我不是 Akka 专家,但通过这 2 处修改,您的代码会产生预期结果 (1 + 2 + 3)^2 = 36。您永远不会停止 actor 系统,因此程序会永远运行, 但原则是有的。
我不确定您的确切要求,但这是在 Akka Streams:
中流式传输列表的一种更简单的方法Source.from(Arrays.asList(1,2,3))