合并流中的 2 个源
Merging 2 sources in streams
我使用 final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
来合并两个来源。阅读 https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html 没有提供合并 > 2 个来源的示例,因此我不确定使用 null 是否是合并 2 个来源的正确方法。
当我 运行 时,下面的代码 100.0 不断打印。每个源计算滑动 window 值的平均值,其中每个 window 大小为 3。每个源之间的差异是 source1
利用和 source2
利用 10。但是 source2
没有被执行为
sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.
如何正确组合 source1
和 source2
以便执行每个源代码?
来源:
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class MultipleStreams {
public static void main(String args[]) {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
final String json1 = "100";
Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
final Double b = x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
return b;
});
final String json2 = "10";
final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
return x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
});
final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
sources.to(printSink).run(actorSystem);
}
}
Concat
尝试首先清空第一个来源。
将其更改为 Merge
给出输出
100.0
10.0
100.0
10.0
100.0
10.0
100.0
10.0
10.0
100.0
10.0
我使用 final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
来合并两个来源。阅读 https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html 没有提供合并 > 2 个来源的示例,因此我不确定使用 null 是否是合并 2 个来源的正确方法。
当我 运行 时,下面的代码 100.0 不断打印。每个源计算滑动 window 值的平均值,其中每个 window 大小为 3。每个源之间的差异是 source1
利用和 source2
利用 10。但是 source2
没有被执行为
sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.
如何正确组合 source1
和 source2
以便执行每个源代码?
来源:
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class MultipleStreams {
public static void main(String args[]) {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
final String json1 = "100";
Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
final Double b = x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
return b;
});
final String json2 = "10";
final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
return x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
});
final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
sources.to(printSink).run(actorSystem);
}
}
Concat
尝试首先清空第一个来源。
将其更改为 Merge
给出输出
100.0
10.0
100.0
10.0
100.0
10.0
100.0
10.0
10.0
100.0
10.0