在 akka 流中的操作之间添加状态

Adding state between operations within akka stream

下面是我用来计算对象列表中数据流平均值的代码:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class sd001 {

    private static final ActorSystem system = ActorSystem.create("akkassembly");
    private static List<RData> ls = new ArrayList();

    private static class RData {
        private String id;

        public RData(String id){
            this.id = id;
        }

        public List<Integer> getValues(){
            if(this.id.equalsIgnoreCase("1")) {
                return Arrays.asList(1, 2, 3, 4, 5);
            }
            else {
                return Arrays.asList(1, 2, 3);
            }
        }

        public String getId() {
            return this.id;
        }
    }

    final static List<RData> builderFunction() {
        try {
            ls.add(new RData("1"));
            ls.add(new RData("2"));
            ls.add(new RData("3"));
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ls;
    }

    private static double calculateAverage(List <Integer> marks) {
        return marks.stream()
                .mapToDouble(d -> d)
                .average()
                .orElse(0.0);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

        final Source<List<RData>, NotUsed> source2 =
                Source.repeat(NotUsed.getInstance()).map(elem -> builderFunction());

                source2.mapConcat(i -> i)
                .groupBy(3 , x -> x.getId())
                .map(v -> calculateAverage(v.getValues()))
                .to(Sink.foreach(x -> System.out.println(x)))
                .run(system);

    }

}

结果输出:

11:55:27.477 [akkassembly-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
3.0
2.0
2.0
3.0

所以似乎按预期工作。

我使用 groupBy 方法 (https://doc.akka.io/docs/akka/current/stream/stream-substream.html) 按关联的 id 值对 List 项目进行分组。如何将 id 值添加到输出平均值的阶段,以便不仅输出平均值,还将 id 打印到屏幕上?我指的阶段是:

.to(Sink.foreach(x -> System.out.println(x)))

一个可能的解决方案是修改方法 getValues 并创建一个新参数 id 和 return 除了平均值之外 id,这将允许为 Sink 访问 println 中的值。这个解决方案似乎过于复杂。看来我需要在 mapto 函数之间携带一个额外的状态(在本例中为 id 吗?

一般来说,Akka Streams 中的阶段不共享状态:它们只在它们之间传递流的元素。因此,在流的各个阶段之间传递状态的唯一通用方法是将状态嵌入到传递的元素中。

在某些情况下,可以使用 SourceWithContext/FlowWithContext:

Essentially, a FlowWithContext is just a Flow that contains tuples of element and context, but the advantage is in the operators: most operators on FlowWithContext will work on the element rather than on the tuple, allowing you to focus on your application logic rather without worrying about the context.

在这种特殊情况下,由于 groupBy 正在执行类似于重新排序元素的操作,FlowWithContext 不支持 groupBy,因此您必须将 ID 嵌入到流元素...

(...除非您想深入自定义图形阶段的深处,这可能会使将 ID 嵌入到流元素中的复杂性相形见绌。)