使用 Akka Graphs 实现简单的架构
Implement simple architecture using Akka Graphs
我正在尝试设置一个简单的图形结构,通过调用 rest 服务来处理数据,在转发结果之前将每个服务的结果转发给中间处理单元。这是一个高级架构:
这可以使用 Akka 图形流来定义吗?正在阅读 https://doc.akka.io/docs/akka/current/stream/stream-graphs.html 我什至不明白如何实现这个简单的架构。
我尝试实现自定义代码以在图形中执行函数:
package com.graph
class RestG {
def flow (in : String) : String = {
return in + "extra"
}
}
object RestG {
case class Flow(in: String) {
def out : String = in+"out"
}
def main(args: Array[String]): Unit = {
List(new RestG().flow("test") , new RestG().flow("test2")).foreach(println)
}
}
我不确定如何在函数之间发送数据。所以我想我应该使用 Akka Graphs 但如何实现上面的架构?
下面是我将如何解决这个问题。首先是一些类型:
type Data = Int
type RestService1Response = String
type RestService2Response = String
type DisplayedResult = Boolean
然后存根函数异步调用外部服务:
def callRestService1(data: Data): Future[RestService1Response] = ???
def callRestService2(data: Data): Future[RestService2Response] = ???
def resultCombiner(resp1: RestService1Response, resp2: RestService2Response): DisplayedResult = ???
现在是 Akka 流(我没有设置 ActorSystem
等)
import akka.Done
import akka.stream.scaladsl._
type SourceMatVal = Any
val dataSource: Source[Data, SourceMatVal] = ???
def restServiceFlow[Response](callF: Data => Future[Data, Response], maxInflight: Int) = Flow[Data].mapAsync(maxInflight)(callF)
// NB: since we're fanning out, there's no reason to have different maxInflights here...
val service1 = restServiceFlow(callRestService1, 4)
val service2 = restServiceFlow(callRestService2, 4)
val downstream = Flow[(RestService1Response, RestService2Response)]
.map((resultCombiner _).tupled)
val splitAndCombine = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val fanOut = b.add(Broadcast[Data](2))
val fanIn = b.add(Zip[RestService1Response, RestService2Response])
fanOut.out(0).via(service1) ~> fanIn.in0
fanOut.out(1).via(service2) ~> fanIn.in1
FlowShape(fanOut.in, fanIn.out)
}
// This future will complete with a `Done` if/when the stream completes
val future: Future[Done] = dataSource
.via(splitAndCombine)
.via(downstream)
.runForeach { displayableData =>
??? // Display the data
}
可以在 Graph DSL 中完成所有连接,但我通常更喜欢让我的图形阶段尽可能简单,并且只在 Source
/[= 上的标准方法范围内使用它们15=]/Sink
不能如愿以偿
我正在尝试设置一个简单的图形结构,通过调用 rest 服务来处理数据,在转发结果之前将每个服务的结果转发给中间处理单元。这是一个高级架构:
这可以使用 Akka 图形流来定义吗?正在阅读 https://doc.akka.io/docs/akka/current/stream/stream-graphs.html 我什至不明白如何实现这个简单的架构。
我尝试实现自定义代码以在图形中执行函数:
package com.graph
class RestG {
def flow (in : String) : String = {
return in + "extra"
}
}
object RestG {
case class Flow(in: String) {
def out : String = in+"out"
}
def main(args: Array[String]): Unit = {
List(new RestG().flow("test") , new RestG().flow("test2")).foreach(println)
}
}
我不确定如何在函数之间发送数据。所以我想我应该使用 Akka Graphs 但如何实现上面的架构?
下面是我将如何解决这个问题。首先是一些类型:
type Data = Int
type RestService1Response = String
type RestService2Response = String
type DisplayedResult = Boolean
然后存根函数异步调用外部服务:
def callRestService1(data: Data): Future[RestService1Response] = ???
def callRestService2(data: Data): Future[RestService2Response] = ???
def resultCombiner(resp1: RestService1Response, resp2: RestService2Response): DisplayedResult = ???
现在是 Akka 流(我没有设置 ActorSystem
等)
import akka.Done
import akka.stream.scaladsl._
type SourceMatVal = Any
val dataSource: Source[Data, SourceMatVal] = ???
def restServiceFlow[Response](callF: Data => Future[Data, Response], maxInflight: Int) = Flow[Data].mapAsync(maxInflight)(callF)
// NB: since we're fanning out, there's no reason to have different maxInflights here...
val service1 = restServiceFlow(callRestService1, 4)
val service2 = restServiceFlow(callRestService2, 4)
val downstream = Flow[(RestService1Response, RestService2Response)]
.map((resultCombiner _).tupled)
val splitAndCombine = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val fanOut = b.add(Broadcast[Data](2))
val fanIn = b.add(Zip[RestService1Response, RestService2Response])
fanOut.out(0).via(service1) ~> fanIn.in0
fanOut.out(1).via(service2) ~> fanIn.in1
FlowShape(fanOut.in, fanIn.out)
}
// This future will complete with a `Done` if/when the stream completes
val future: Future[Done] = dataSource
.via(splitAndCombine)
.via(downstream)
.runForeach { displayableData =>
??? // Display the data
}
可以在 Graph DSL 中完成所有连接,但我通常更喜欢让我的图形阶段尽可能简单,并且只在 Source
/[= 上的标准方法范围内使用它们15=]/Sink
不能如愿以偿