如何使用 GraphDSL 将流拆分为两个流

How to split a stream into two flows using GraphDSL

我正在学习 akka 流,作为一个例子,我试图根据使用 GraphDSL.

这是我拼接在一起的完整代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SourceShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import objectmappertest.Request;
import java.util.Collections;
import java.util.concurrent.CompletionStage;

public class FilterObj {

    public static void main(String args[]){

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");

        final String json1  = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
        final String json2  = "{\"unknownField\":\"test\"}";

        final ObjectMapper mapper = new ObjectMapper();

        final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
        final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
        final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);

        final Source<String, NotUsed> source =
                Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);

        final Flow<String, Request, NotUsed> flowOperation = Flow.of(String.class)
                .map ( input -> {
                    final JsonNode request = mapper.readTree(input);
                    if(request.has("datePurchased")){
                        return mapper.readValue(request.toString() , Request.class);
                    }
                    else {
                        return mapper.readValue(request.toString() , Request.class);
                    }

                }).log("error");

        final Sink<Request, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
                GraphDSL.create(printSink, (builder, out) -> {
                    final SourceShape<String> sourceShape = builder.add(source);
                    final FlowShape<String, Request> flow1Shape = builder.add(flowOperation);

                    final UniformFanOutShape<String, String> broadcast =
                            builder.add(Broadcast.create(2));
                    builder.from(sourceShape)
                            .viaFanOut(broadcast)
                            .via(flow1Shape)
                            .to(out);

                    return ClosedShape.getInstance();
                })
        );
        graph.run(actorSystem);
        
    }
}

请求class:

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;

@Builder
@ToString
@Jacksonized
public class Request
{
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;
}

运行 上图打印:

17:16:57.129 INFO  akka.event.slf4j.Slf4jLogger  [NativeMethodAccessorImpl.java:-2] - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
Exception in thread "main" java.lang.IllegalStateException: Illegal GraphDSL usage. Outlets [Broadcast.out1] were not returned in the resulting shape and not connected.
    at akka.stream.scaladsl.GraphDSL$Builder.result(Graph.scala:1690)
    at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:1144)
    at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:46)
    at akka.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:41)
    at akka.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1557)
    at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:26)
    at akka.stream.javadsl.GraphDSL.create(Graph.scala)
    at FilterObj.FilterObj.main(FilterObj.java:70)

为了模拟两种类型的 JSON 有效载荷,我使用以下方法将源合并为一个源:

    final String json1  = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
    final String json2  = "{\"unknownField\":\"test\"}";

    final ObjectMapper mapper = new ObjectMapper();

    final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
    final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
    final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);

    final Source<String, NotUsed> source =
            Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);

我不确定如何正确合并逻辑来拆分数据,具体取决于 datePurchased 是否包含在正在解析的 JSON 中,目前它被实现为

                if(request.has("datePurchased")){
                    return mapper.readValue(request.toString() , Request.class);
                }
                else {
                    return mapper.readValue(request.toString() , Request.class);
                }

我想我需要一个 UniformFanOutShape 但我不确定如何将它合并到 GraphDSL 中。

如果您想拆分数据,则需要 Partition 而不是 Broadcast

它们都是扇出形状,但是 Broadcast 将传入消息发送到它的所有输出,Partition 允许您使用自定义逻辑来决定消息应该转到哪个输出......这就是我认为你想要做的。

文档非常好,包含代码示例: https://doc.akka.io/docs/akka/current/stream/operators/Partition.html