map 和 Flow 的区别

Difference between map and Flow

从 2016 年阅读 Google 组 post:“.map() 转换为 .via()”

来源:https://groups.google.com/g/akka-user/c/EzHygZpcCHg

以下代码行是否等效:

Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

是否存在应该使用地图而不是流的场景?

来源:

请求DTO:

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;
}

StreamManager(包含主要方法):

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.concurrent.CompletionStage;

public class StreamManager {

    final static ObjectMapper mapper = new ObjectMapper();

    private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
            .map(input -> mapper.readValue(input, RequestDto.class))
            .log("error");

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
        final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";

        Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
        Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

    }

}

map 被转换为 via,但它在语法上并不完全等同 via,正如您从 Flow.of().map().

得到的那样

第一个将转换为 .via(Map(f)),其中 MapGraphStage which implements the map operation

在第二种情况下,mapToDtoFlow(忽略 log)本身就是(在 Scala 表示法中)Flow[String].via(Map(f)),因此您要添加另一层 via: .via(Flow[String].via(Map(f))).

就所有意图和目的而言,它们是相同的(我怀疑实体化器在解释您构建的 RunnableGraph 时会以相同的方式对待它们)。

考虑到 .logmapToDtoFlow 等价(同样在 Scala 中):

Flow[String]
  .via(Map(f))
  .via(Log(...))

Akka Streams 中基本上定义了三个级别,从最高级别到最低级别:

  • Java/Scala DSL
  • Java/Scala 图 DSL
  • GraphStages

DSL 仅指定构建 GraphStage 的简洁方法,而 link GraphStage 具有 Flow 形状的基本方法是通过 via操作。