如何使用 GraphDSL 将流拆分为两个流
How to split a stream into two flows using GraphDSL
我正在学习 akka 流,作为一个例子,我试图根据使用 GraphDSL
.
这是我拼接在一起的完整代码:
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SourceShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import objectmappertest.Request;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
public class FilterObj {
public static void main(String args[]){
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
final Flow<String, Request, NotUsed> flowOperation = Flow.of(String.class)
.map ( input -> {
final JsonNode request = mapper.readTree(input);
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
}).log("error");
final Sink<Request, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(printSink, (builder, out) -> {
final SourceShape<String> sourceShape = builder.add(source);
final FlowShape<String, Request> flow1Shape = builder.add(flowOperation);
final UniformFanOutShape<String, String> broadcast =
builder.add(Broadcast.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape)
.to(out);
return ClosedShape.getInstance();
})
);
graph.run(actorSystem);
}
}
请求class:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Builder
@ToString
@Jacksonized
public class Request
{
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
运行 上图打印:
17:16:57.129 INFO akka.event.slf4j.Slf4jLogger [NativeMethodAccessorImpl.java:-2] - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
Exception in thread "main" java.lang.IllegalStateException: Illegal GraphDSL usage. Outlets [Broadcast.out1] were not returned in the resulting shape and not connected.
at akka.stream.scaladsl.GraphDSL$Builder.result(Graph.scala:1690)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:1144)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:46)
at akka.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:41)
at akka.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1557)
at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:26)
at akka.stream.javadsl.GraphDSL.create(Graph.scala)
at FilterObj.FilterObj.main(FilterObj.java:70)
为了模拟两种类型的 JSON 有效载荷,我使用以下方法将源合并为一个源:
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
我不确定如何正确合并逻辑来拆分数据,具体取决于 datePurchased
是否包含在正在解析的 JSON 中,目前它被实现为
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
我想我需要一个 UniformFanOutShape
但我不确定如何将它合并到 GraphDSL 中。
如果您想拆分数据,则需要 Partition
而不是 Broadcast
。
它们都是扇出形状,但是 Broadcast
将传入消息发送到它的所有输出,Partition
允许您使用自定义逻辑来决定消息应该转到哪个输出......这就是我认为你想要做的。
文档非常好,包含代码示例:
https://doc.akka.io/docs/akka/current/stream/operators/Partition.html
我正在学习 akka 流,作为一个例子,我试图根据使用 GraphDSL
.
这是我拼接在一起的完整代码:
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SourceShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import objectmappertest.Request;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
public class FilterObj {
public static void main(String args[]){
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
final Flow<String, Request, NotUsed> flowOperation = Flow.of(String.class)
.map ( input -> {
final JsonNode request = mapper.readTree(input);
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
}).log("error");
final Sink<Request, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(printSink, (builder, out) -> {
final SourceShape<String> sourceShape = builder.add(source);
final FlowShape<String, Request> flow1Shape = builder.add(flowOperation);
final UniformFanOutShape<String, String> broadcast =
builder.add(Broadcast.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape)
.to(out);
return ClosedShape.getInstance();
})
);
graph.run(actorSystem);
}
}
请求class:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Builder
@ToString
@Jacksonized
public class Request
{
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
运行 上图打印:
17:16:57.129 INFO akka.event.slf4j.Slf4jLogger [NativeMethodAccessorImpl.java:-2] - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
Exception in thread "main" java.lang.IllegalStateException: Illegal GraphDSL usage. Outlets [Broadcast.out1] were not returned in the resulting shape and not connected.
at akka.stream.scaladsl.GraphDSL$Builder.result(Graph.scala:1690)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:1144)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:46)
at akka.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:41)
at akka.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1557)
at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:26)
at akka.stream.javadsl.GraphDSL.create(Graph.scala)
at FilterObj.FilterObj.main(FilterObj.java:70)
为了模拟两种类型的 JSON 有效载荷,我使用以下方法将源合并为一个源:
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
我不确定如何正确合并逻辑来拆分数据,具体取决于 datePurchased
是否包含在正在解析的 JSON 中,目前它被实现为
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
我想我需要一个 UniformFanOutShape
但我不确定如何将它合并到 GraphDSL 中。
如果您想拆分数据,则需要 Partition
而不是 Broadcast
。
它们都是扇出形状,但是 Broadcast
将传入消息发送到它的所有输出,Partition
允许您使用自定义逻辑来决定消息应该转到哪个输出......这就是我认为你想要做的。
文档非常好,包含代码示例: https://doc.akka.io/docs/akka/current/stream/operators/Partition.html