合并流中的 2 个源

Merging 2 sources in streams

我使用 final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create); 来合并两个来源。阅读 https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html 没有提供合并 > 2 个来源的示例,因此我不确定使用 null 是否是合并 2 个来源的正确方法。

当我 运行 时,下面的代码 100.0 不断打印。每个源计算滑动 window 值的平均值,其中每个 window 大小为 3。每个源之间的差异是 source1 利用和 source2 利用 10。但是 source2 没有被执行为

sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.

如何正确组合 source1source2 以便执行每个源代码?

来源:

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.time.Duration;
import java.util.concurrent.CompletionStage;


public class MultipleStreams {

    public static void main(String args[]) {

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        final String json1 = "100";
        Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    final Double b = x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                    return b;
                });

        final String json2 = "10";
        final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    return x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                });

        final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);

        sources.to(printSink).run(actorSystem);

    }
}

Concat 尝试首先清空第一个来源。

将其更改为 Merge 给出输出

100.0
10.0
100.0
10.0
100.0
10.0
100.0
10.0
10.0
100.0
10.0