map 和 Flow 的区别
Difference between map and Flow
从 2016 年阅读 Google 组 post:“.map() 转换为 .via()”
来源:https://groups.google.com/g/akka-user/c/EzHygZpcCHg
以下代码行是否等效:
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
是否存在应该使用地图而不是流的场景?
来源:
请求DTO:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
StreamManager(包含主要方法):
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.CompletionStage;
public class StreamManager {
final static ObjectMapper mapper = new ObjectMapper();
private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
.map(input -> mapper.readValue(input, RequestDto.class))
.log("error");
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
}
}
map
被转换为 via
,但它在语法上并不完全等同 via
,正如您从 Flow.of().map()
.
得到的那样
第一个将转换为 .via(Map(f))
,其中 Map
是 GraphStage
which implements the map operation。
在第二种情况下,mapToDtoFlow
(忽略 log
)本身就是(在 Scala 表示法中)Flow[String].via(Map(f))
,因此您要添加另一层 via
: .via(Flow[String].via(Map(f)))
.
就所有意图和目的而言,它们是相同的(我怀疑实体化器在解释您构建的 RunnableGraph
时会以相同的方式对待它们)。
考虑到 .log
,mapToDtoFlow
等价(同样在 Scala 中):
Flow[String]
.via(Map(f))
.via(Log(...))
Akka Streams 中基本上定义了三个级别,从最高级别到最低级别:
- Java/Scala DSL
- Java/Scala 图 DSL
GraphStage
s
DSL 仅指定构建 GraphStage
的简洁方法,而 link GraphStage
具有 Flow
形状的基本方法是通过 via
操作。
从 2016 年阅读 Google 组 post:“.map() 转换为 .via()”
来源:https://groups.google.com/g/akka-user/c/EzHygZpcCHg
以下代码行是否等效:
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
是否存在应该使用地图而不是流的场景?
来源:
请求DTO:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
StreamManager(包含主要方法):
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.CompletionStage;
public class StreamManager {
final static ObjectMapper mapper = new ObjectMapper();
private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
.map(input -> mapper.readValue(input, RequestDto.class))
.log("error");
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
}
}
map
被转换为 via
,但它在语法上并不完全等同 via
,正如您从 Flow.of().map()
.
第一个将转换为 .via(Map(f))
,其中 Map
是 GraphStage
which implements the map operation。
在第二种情况下,mapToDtoFlow
(忽略 log
)本身就是(在 Scala 表示法中)Flow[String].via(Map(f))
,因此您要添加另一层 via
: .via(Flow[String].via(Map(f)))
.
就所有意图和目的而言,它们是相同的(我怀疑实体化器在解释您构建的 RunnableGraph
时会以相同的方式对待它们)。
考虑到 .log
,mapToDtoFlow
等价(同样在 Scala 中):
Flow[String]
.via(Map(f))
.via(Log(...))
Akka Streams 中基本上定义了三个级别,从最高级别到最低级别:
- Java/Scala DSL
- Java/Scala 图 DSL
GraphStage
s
DSL 仅指定构建 GraphStage
的简洁方法,而 link GraphStage
具有 Flow
形状的基本方法是通过 via
操作。