用于分离 akka 流的模式

Pattern for separating akka streams Flows

我正在参加 Akka Streams Udemy 课程,我提供了以下代码:

package multiflow;

import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.SourceShape;
import akka.stream.javadsl.*;

import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class Main {

    public static void main(String[] args) {
        Map<Integer, VehiclePositionMessage> vehicleTrackingMap = new HashMap<>();
        for (int i = 1; i <=8; i++) {
            vehicleTrackingMap.put(i, new VehiclePositionMessage(1, new Date(), 0,0));
        }

        //source - repeat some value every 10 seconds.
        Source<String, NotUsed> source = Source.repeat("go").throttle(1, Duration.ofSeconds(10));

        //flow 1 - transform into the ids of each van (ie 1..8) with mapConcat
        Flow<String, Integer, NotUsed> vehicleIds = Flow.of(String.class)
                .mapConcat(value -> List.of(1,2,3,4,5,6,7,8));

        //flow 2 - get position for each van as a VPMs with a call to the lookup method (create a new instance of
        //utility functions each time). Note that this process isn't instant so should be run in parallel.
        Flow<Integer, VehiclePositionMessage, NotUsed> vehiclePostions = Flow.of(Integer.class)
                .mapAsyncUnordered(8, vehicleId -> {
                    System.out.println("Requesting Position for vehicle " + vehicleId);
                    CompletableFuture<VehiclePositionMessage> future = new CompletableFuture<>();
                    UtilityFunctions utilityFunctions = new UtilityFunctions();
                    future.completeAsync( () -> utilityFunctions.getVehiclePosition(vehicleId));
                    return future;
                });

        //flow 3 - use previous position from the map to calculate the current speed of each vehicle. Replace the
        // position in the map with the newest position and pass the current speed downstream
        Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
                .map ( vpm -> {
                    UtilityFunctions utilityFunctions = new UtilityFunctions();
                    VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
                    VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
                    System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
                    vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
                    return speed;
                });

        //flow 4 - filter to only keep those values with a speed > 95
        Flow<VehicleSpeed, VehicleSpeed, NotUsed> speedFilter = Flow.of(VehicleSpeed.class)
                .filter(speed -> speed.getSpeed() > 95);

        //sink - as soon as 1 value is received return it as a materialized value, and terminate the stream
        Sink<VehicleSpeed, CompletionStage<VehicleSpeed>> sink = Sink.head();

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");

//        CompletionStage<VehicleSpeed> result = source.via(vehicleIds)
//                .async()
//                .via(vehiclePostions)
//                .async()
//                .via(vehicleSpeeds)
//                .via(speedFilter)
//                .toMat(sink, Keep.right())
//                .run(actorSystem);

        RunnableGraph<CompletionStage<VehicleSpeed>> graph = RunnableGraph.fromGraph(
                GraphDSL.create( sink, (builder, out) -> {

                    SourceShape<String> sourceShape = builder.add(Source.repeat("go")
                            .throttle(1, Duration.ofSeconds(10)));
                    //SourceShape<String> sourceShape = builder.add(source);

                    FlowShape<String,Integer> vehicleIdsShape = builder.add(vehicleIds);
                    FlowShape<Integer, VehiclePositionMessage> vehiclePositionsShape =
                            builder.add(vehiclePostions.async());
                    FlowShape<VehiclePositionMessage, VehicleSpeed> vehicleSpeedsShape =
                            builder.add(vehicleSpeeds);
                    FlowShape<VehicleSpeed, VehicleSpeed> speedFilterShape =
                            builder.add(speedFilter);

                    //DON'T NEED TO DO THIS - OUT IS OUR SINKSHAPE
                    //SinkShape<VehicleSpeed> out = builder.add(sink);

                    builder.from(sourceShape)
                    .via(vehicleIdsShape)
                    .via(vehiclePositionsShape);

                    builder.from(vehicleSpeedsShape)
                    .via(speedFilterShape)
                    .to(out);

                    builder.from(vehiclePositionsShape)
                    .via(vehicleSpeedsShape);

                    return ClosedShape.getInstance();
                })

        );

        CompletionStage<VehicleSpeed> result = graph.run(actorSystem);

        result.whenComplete( (value, throwable) -> {
            if (throwable != null) {
                System.out.println("Something went wrong " + throwable);
            }
            else {
                System.out.println("Vehicle " + value.getVehicleId() + " was going at a speed of " + value.getSpeed());
            }
            actorSystem.terminate();
        }) ;
    }

}

这不是家庭作业的一部分,但根据我自己的理解,我正在尝试简化流链接在一起的方式。

我创建了这个新的 class :

import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;

import java.util.Map;


public class FlowBuilder {

    public static Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> builder(Map<Integer, VehiclePositionMessage> vehicleTrackingMap) {

        Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
                .map ( vpm -> {
                    UtilityFunctions utilityFunctions = new UtilityFunctions();
                    VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
                    VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
                    System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
                    vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
                    return speed;
                });


        return vehicleSpeeds;
    }

}

它的名字是这样的:

Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder(vehicleTrackingMap);

在功能上这按预期工作,但是否有替代方法 method/pattern 可以处理多个流程并且应该将它们结合起来。我认为使用静态是一种反模式 ?

调用构建器的更新方法:

   Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder()
           .vehicleTrackingMap(vehicleTrackingMap).build().getFLow();

流程已修改为使用构建器:

import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import lombok.ToString;

import java.util.Map;

@Builder
@ToString
public class FlowBuilder {

    Map<Integer, VehiclePositionMessage> vehicleTrackingMap;

    public Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> getFLow() {

        Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
                .map ( vpm -> {
                    UtilityFunctions utilityFunctions = new UtilityFunctions();
                    VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
                    VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
                    System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
                    vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
                    return speed;
                });


        return vehicleSpeeds;
    }

}

不确定您要在此处简化什么,但如果您想避免在一个方法中定义所有流程,只需将每个流程包装在同一个 class 中的自己的方法中。如果您制作这些方法 public,则可以分别对每个 Flow 进行单元测试。所以类似的东西

public class MyStream {
  public void runStream() {
    var myMap = new HashMap<???,???>()
    mySource()
      .via(myFlow1())
      .via(myFlow2(myMap))
      .run(....); 
  }

  public Source<???,???> mySource() { ??? }
  public Flow<???,???,???> myFlow1() { ??? }
  public Flow<???,???,???> myFlow2(Map<???, ???> stuffYouNeedForFlow) {???}
}

现在每个 Flow 都可以使用此处描述的测试技术进行测试 https://doc.akka.io/docs/akka/current/stream/stream-testkit.html