用于分离 akka 流的模式
Pattern for separating akka streams Flows
我正在参加 Akka Streams Udemy 课程,我提供了以下代码:
package multiflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.SourceShape;
import akka.stream.javadsl.*;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class Main {
public static void main(String[] args) {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap = new HashMap<>();
for (int i = 1; i <=8; i++) {
vehicleTrackingMap.put(i, new VehiclePositionMessage(1, new Date(), 0,0));
}
//source - repeat some value every 10 seconds.
Source<String, NotUsed> source = Source.repeat("go").throttle(1, Duration.ofSeconds(10));
//flow 1 - transform into the ids of each van (ie 1..8) with mapConcat
Flow<String, Integer, NotUsed> vehicleIds = Flow.of(String.class)
.mapConcat(value -> List.of(1,2,3,4,5,6,7,8));
//flow 2 - get position for each van as a VPMs with a call to the lookup method (create a new instance of
//utility functions each time). Note that this process isn't instant so should be run in parallel.
Flow<Integer, VehiclePositionMessage, NotUsed> vehiclePostions = Flow.of(Integer.class)
.mapAsyncUnordered(8, vehicleId -> {
System.out.println("Requesting Position for vehicle " + vehicleId);
CompletableFuture<VehiclePositionMessage> future = new CompletableFuture<>();
UtilityFunctions utilityFunctions = new UtilityFunctions();
future.completeAsync( () -> utilityFunctions.getVehiclePosition(vehicleId));
return future;
});
//flow 3 - use previous position from the map to calculate the current speed of each vehicle. Replace the
// position in the map with the newest position and pass the current speed downstream
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
//flow 4 - filter to only keep those values with a speed > 95
Flow<VehicleSpeed, VehicleSpeed, NotUsed> speedFilter = Flow.of(VehicleSpeed.class)
.filter(speed -> speed.getSpeed() > 95);
//sink - as soon as 1 value is received return it as a materialized value, and terminate the stream
Sink<VehicleSpeed, CompletionStage<VehicleSpeed>> sink = Sink.head();
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
// CompletionStage<VehicleSpeed> result = source.via(vehicleIds)
// .async()
// .via(vehiclePostions)
// .async()
// .via(vehicleSpeeds)
// .via(speedFilter)
// .toMat(sink, Keep.right())
// .run(actorSystem);
RunnableGraph<CompletionStage<VehicleSpeed>> graph = RunnableGraph.fromGraph(
GraphDSL.create( sink, (builder, out) -> {
SourceShape<String> sourceShape = builder.add(Source.repeat("go")
.throttle(1, Duration.ofSeconds(10)));
//SourceShape<String> sourceShape = builder.add(source);
FlowShape<String,Integer> vehicleIdsShape = builder.add(vehicleIds);
FlowShape<Integer, VehiclePositionMessage> vehiclePositionsShape =
builder.add(vehiclePostions.async());
FlowShape<VehiclePositionMessage, VehicleSpeed> vehicleSpeedsShape =
builder.add(vehicleSpeeds);
FlowShape<VehicleSpeed, VehicleSpeed> speedFilterShape =
builder.add(speedFilter);
//DON'T NEED TO DO THIS - OUT IS OUR SINKSHAPE
//SinkShape<VehicleSpeed> out = builder.add(sink);
builder.from(sourceShape)
.via(vehicleIdsShape)
.via(vehiclePositionsShape);
builder.from(vehicleSpeedsShape)
.via(speedFilterShape)
.to(out);
builder.from(vehiclePositionsShape)
.via(vehicleSpeedsShape);
return ClosedShape.getInstance();
})
);
CompletionStage<VehicleSpeed> result = graph.run(actorSystem);
result.whenComplete( (value, throwable) -> {
if (throwable != null) {
System.out.println("Something went wrong " + throwable);
}
else {
System.out.println("Vehicle " + value.getVehicleId() + " was going at a speed of " + value.getSpeed());
}
actorSystem.terminate();
}) ;
}
}
这不是家庭作业的一部分,但根据我自己的理解,我正在尝试简化流链接在一起的方式。
我创建了这个新的 class :
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import java.util.Map;
public class FlowBuilder {
public static Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> builder(Map<Integer, VehiclePositionMessage> vehicleTrackingMap) {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
它的名字是这样的:
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder(vehicleTrackingMap);
在功能上这按预期工作,但是否有替代方法 method/pattern 可以处理多个流程并且应该将它们结合起来。我认为使用静态是一种反模式 ?
调用构建器的更新方法:
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder()
.vehicleTrackingMap(vehicleTrackingMap).build().getFLow();
流程已修改为使用构建器:
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import lombok.ToString;
import java.util.Map;
@Builder
@ToString
public class FlowBuilder {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap;
public Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> getFLow() {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
不确定您要在此处简化什么,但如果您想避免在一个方法中定义所有流程,只需将每个流程包装在同一个 class 中的自己的方法中。如果您制作这些方法 public,则可以分别对每个 Flow
进行单元测试。所以类似的东西
public class MyStream {
public void runStream() {
var myMap = new HashMap<???,???>()
mySource()
.via(myFlow1())
.via(myFlow2(myMap))
.run(....);
}
public Source<???,???> mySource() { ??? }
public Flow<???,???,???> myFlow1() { ??? }
public Flow<???,???,???> myFlow2(Map<???, ???> stuffYouNeedForFlow) {???}
}
现在每个 Flow
都可以使用此处描述的测试技术进行测试 https://doc.akka.io/docs/akka/current/stream/stream-testkit.html
我正在参加 Akka Streams Udemy 课程,我提供了以下代码:
package multiflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.SourceShape;
import akka.stream.javadsl.*;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class Main {
public static void main(String[] args) {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap = new HashMap<>();
for (int i = 1; i <=8; i++) {
vehicleTrackingMap.put(i, new VehiclePositionMessage(1, new Date(), 0,0));
}
//source - repeat some value every 10 seconds.
Source<String, NotUsed> source = Source.repeat("go").throttle(1, Duration.ofSeconds(10));
//flow 1 - transform into the ids of each van (ie 1..8) with mapConcat
Flow<String, Integer, NotUsed> vehicleIds = Flow.of(String.class)
.mapConcat(value -> List.of(1,2,3,4,5,6,7,8));
//flow 2 - get position for each van as a VPMs with a call to the lookup method (create a new instance of
//utility functions each time). Note that this process isn't instant so should be run in parallel.
Flow<Integer, VehiclePositionMessage, NotUsed> vehiclePostions = Flow.of(Integer.class)
.mapAsyncUnordered(8, vehicleId -> {
System.out.println("Requesting Position for vehicle " + vehicleId);
CompletableFuture<VehiclePositionMessage> future = new CompletableFuture<>();
UtilityFunctions utilityFunctions = new UtilityFunctions();
future.completeAsync( () -> utilityFunctions.getVehiclePosition(vehicleId));
return future;
});
//flow 3 - use previous position from the map to calculate the current speed of each vehicle. Replace the
// position in the map with the newest position and pass the current speed downstream
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
//flow 4 - filter to only keep those values with a speed > 95
Flow<VehicleSpeed, VehicleSpeed, NotUsed> speedFilter = Flow.of(VehicleSpeed.class)
.filter(speed -> speed.getSpeed() > 95);
//sink - as soon as 1 value is received return it as a materialized value, and terminate the stream
Sink<VehicleSpeed, CompletionStage<VehicleSpeed>> sink = Sink.head();
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
// CompletionStage<VehicleSpeed> result = source.via(vehicleIds)
// .async()
// .via(vehiclePostions)
// .async()
// .via(vehicleSpeeds)
// .via(speedFilter)
// .toMat(sink, Keep.right())
// .run(actorSystem);
RunnableGraph<CompletionStage<VehicleSpeed>> graph = RunnableGraph.fromGraph(
GraphDSL.create( sink, (builder, out) -> {
SourceShape<String> sourceShape = builder.add(Source.repeat("go")
.throttle(1, Duration.ofSeconds(10)));
//SourceShape<String> sourceShape = builder.add(source);
FlowShape<String,Integer> vehicleIdsShape = builder.add(vehicleIds);
FlowShape<Integer, VehiclePositionMessage> vehiclePositionsShape =
builder.add(vehiclePostions.async());
FlowShape<VehiclePositionMessage, VehicleSpeed> vehicleSpeedsShape =
builder.add(vehicleSpeeds);
FlowShape<VehicleSpeed, VehicleSpeed> speedFilterShape =
builder.add(speedFilter);
//DON'T NEED TO DO THIS - OUT IS OUR SINKSHAPE
//SinkShape<VehicleSpeed> out = builder.add(sink);
builder.from(sourceShape)
.via(vehicleIdsShape)
.via(vehiclePositionsShape);
builder.from(vehicleSpeedsShape)
.via(speedFilterShape)
.to(out);
builder.from(vehiclePositionsShape)
.via(vehicleSpeedsShape);
return ClosedShape.getInstance();
})
);
CompletionStage<VehicleSpeed> result = graph.run(actorSystem);
result.whenComplete( (value, throwable) -> {
if (throwable != null) {
System.out.println("Something went wrong " + throwable);
}
else {
System.out.println("Vehicle " + value.getVehicleId() + " was going at a speed of " + value.getSpeed());
}
actorSystem.terminate();
}) ;
}
}
这不是家庭作业的一部分,但根据我自己的理解,我正在尝试简化流链接在一起的方式。
我创建了这个新的 class :
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import java.util.Map;
public class FlowBuilder {
public static Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> builder(Map<Integer, VehiclePositionMessage> vehicleTrackingMap) {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
它的名字是这样的:
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder(vehicleTrackingMap);
在功能上这按预期工作,但是否有替代方法 method/pattern 可以处理多个流程并且应该将它们结合起来。我认为使用静态是一种反模式 ?
调用构建器的更新方法:
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = FlowBuilder.builder()
.vehicleTrackingMap(vehicleTrackingMap).build().getFLow();
流程已修改为使用构建器:
import akka.NotUsed;
import akka.stream.javadsl.Flow;
import lombok.Builder;
import lombok.ToString;
import java.util.Map;
@Builder
@ToString
public class FlowBuilder {
Map<Integer, VehiclePositionMessage> vehicleTrackingMap;
public Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> getFLow() {
Flow<VehiclePositionMessage, VehicleSpeed, NotUsed> vehicleSpeeds = Flow.of(VehiclePositionMessage.class)
.map ( vpm -> {
UtilityFunctions utilityFunctions = new UtilityFunctions();
VehiclePositionMessage previousVpm = vehicleTrackingMap.get(vpm.getVehicleId());
VehicleSpeed speed = utilityFunctions.calculateSpeed(vpm, previousVpm);
System.out.println("Vehicle " + vpm.getVehicleId() + " is travelling at " + speed.getSpeed());
vehicleTrackingMap.put(vpm.getVehicleId(), vpm);
return speed;
});
return vehicleSpeeds;
}
}
不确定您要在此处简化什么,但如果您想避免在一个方法中定义所有流程,只需将每个流程包装在同一个 class 中的自己的方法中。如果您制作这些方法 public,则可以分别对每个 Flow
进行单元测试。所以类似的东西
public class MyStream {
public void runStream() {
var myMap = new HashMap<???,???>()
mySource()
.via(myFlow1())
.via(myFlow2(myMap))
.run(....);
}
public Source<???,???> mySource() { ??? }
public Flow<???,???,???> myFlow1() { ??? }
public Flow<???,???,???> myFlow2(Map<???, ???> stuffYouNeedForFlow) {???}
}
现在每个 Flow
都可以使用此处描述的测试技术进行测试 https://doc.akka.io/docs/akka/current/stream/stream-testkit.html