使用 akka 流折叠不能按预期工作
fold not working as expected using akka streams
下面是我编写的代码,用于尝试输出收到的每条 akka 消息的总和,这些消息已根据本指南进行了编辑:
https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class AkkaSourceTesting {
public static void main(String args[]){
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty() , "actorSystem");
Source<Integer, ActorRef> matValuePoweredSource =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
// never fail the stream because of a message
elem -> Optional.empty(),
100,
OverflowStrategy.fail());
Pair<ActorRef, Source<Integer, NotUsed>> actorRefSourcePair =
matValuePoweredSource.preMaterialize(actorSystem);
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
Flow<Integer, Integer, NotUsed> groupedFlow = Flow.of(Integer.class)
.grouped(2)
.map(value -> {
List<Integer> newList = new ArrayList<>(value);
return newList;
})
.mapConcat(value -> value);
// pass source around for materialization
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);
}
}
折叠操作似乎没有在控制台上输出任何内容。
但是如果我使用
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).map(x -> x * 2)).runWith(Sink.foreach(System.out::println), actorSystem);
而不是
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);
然后输出如下:
2
2
我正在尝试对列表进行分组并对每个组执行折叠操作,但折叠甚至没有被执行。我错过了一步吗?
Flow.fold
在上游完成之前不会发出值。
另请注意,您的 groupedFlow
是一个身份流程:它可以在不更改任何内容的情况下删除:
grouped
获取每对连续的元素并将它们捆绑成 List
map
阶段将 List
转换为 ArrayList
mapConcat
展开 ArrayList
并发出元素
在 Java 中最清楚地表达您正在寻找的内容(连续的 2 组对之和的流)可能是
actorRefSourcePair.second()
.grouped(2)
.map(twoList -> twoList.stream().reduce(0, Integer::sum))
.runWith(Sink.foreach(System.out::println), actorSystem);
下面是我编写的代码,用于尝试输出收到的每条 akka 消息的总和,这些消息已根据本指南进行了编辑:
https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class AkkaSourceTesting {
public static void main(String args[]){
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty() , "actorSystem");
Source<Integer, ActorRef> matValuePoweredSource =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
// never fail the stream because of a message
elem -> Optional.empty(),
100,
OverflowStrategy.fail());
Pair<ActorRef, Source<Integer, NotUsed>> actorRefSourcePair =
matValuePoweredSource.preMaterialize(actorSystem);
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
Flow<Integer, Integer, NotUsed> groupedFlow = Flow.of(Integer.class)
.grouped(2)
.map(value -> {
List<Integer> newList = new ArrayList<>(value);
return newList;
})
.mapConcat(value -> value);
// pass source around for materialization
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);
}
}
折叠操作似乎没有在控制台上输出任何内容。
但是如果我使用
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).map(x -> x * 2)).runWith(Sink.foreach(System.out::println), actorSystem);
而不是
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);
然后输出如下:
2
2
我正在尝试对列表进行分组并对每个组执行折叠操作,但折叠甚至没有被执行。我错过了一步吗?
Flow.fold
在上游完成之前不会发出值。
另请注意,您的 groupedFlow
是一个身份流程:它可以在不更改任何内容的情况下删除:
grouped
获取每对连续的元素并将它们捆绑成List
map
阶段将List
转换为ArrayList
mapConcat
展开ArrayList
并发出元素
在 Java 中最清楚地表达您正在寻找的内容(连续的 2 组对之和的流)可能是
actorRefSourcePair.second()
.grouped(2)
.map(twoList -> twoList.stream().reduce(0, Integer::sum))
.runWith(Sink.foreach(System.out::println), actorSystem);