使用 akka 流有条件地跳过流
Conditionally skip flow using akka streams
我正在使用 akka 流,我有一段图表需要有条件地跳过,因为流无法处理某些值。具体来说,我有一个流程,它接受一个字符串并发出 http 请求,但是服务器无法处理字符串为空的情况。但我只需要 return 一个空字符串。有没有一种方法可以做到这一点而不必通过 http 请求知道它会失败?我基本上有这个:
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
我唯一能想到的就是在我的 httpResponse 流中捕获 400 错误并return设置一个默认值。但是我希望能够避免为我知道会事先失败的请求而访问服务器的开销。
您可以使用 flatMapConcat
:
(警告:从未编译过,但您会明白其中的要点)
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
case "" => emptyStringSource
case other => Source.single(other) via someHttpTransformation
})
Viktor Klang 的解决方案简洁明了 elegant。我只是想演示一个使用图形的替代方案。
您可以将字符串源分成两个流,并过滤一个流以获取有效字符串,另一个流以过滤无效字符串。然后合并结果("cross the streams").
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val source = Source(List("1", "2", "", "3", "4"))
val sink : Sink[String,_] = ???
val bcast = builder.add(Broadcast[String](2))
val merge = builder.add(Merge[String](2))
val validReq = Flow[String].filter(_.size > 0)
val invalidReq = Flow[String].filter(_.size == 0)
val httpRequest: Flow[String, HttpRequest, _] = ???
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
val httpResponse: Flow[HttpResponse, String, _] = ???
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
bcast ~> invalidReq ~> merge
ClosedShape
})
注意:此解决方案拆分了流,因此 Sink 处理字符串值结果的顺序可能与基于输入的预期顺序不同。
我正在使用 akka 流,我有一段图表需要有条件地跳过,因为流无法处理某些值。具体来说,我有一个流程,它接受一个字符串并发出 http 请求,但是服务器无法处理字符串为空的情况。但我只需要 return 一个空字符串。有没有一种方法可以做到这一点而不必通过 http 请求知道它会失败?我基本上有这个:
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
我唯一能想到的就是在我的 httpResponse 流中捕获 400 错误并return设置一个默认值。但是我希望能够避免为我知道会事先失败的请求而访问服务器的开销。
您可以使用 flatMapConcat
:
(警告:从未编译过,但您会明白其中的要点)
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
case "" => emptyStringSource
case other => Source.single(other) via someHttpTransformation
})
Viktor Klang 的解决方案简洁明了 elegant。我只是想演示一个使用图形的替代方案。
您可以将字符串源分成两个流,并过滤一个流以获取有效字符串,另一个流以过滤无效字符串。然后合并结果("cross the streams").
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val source = Source(List("1", "2", "", "3", "4"))
val sink : Sink[String,_] = ???
val bcast = builder.add(Broadcast[String](2))
val merge = builder.add(Merge[String](2))
val validReq = Flow[String].filter(_.size > 0)
val invalidReq = Flow[String].filter(_.size == 0)
val httpRequest: Flow[String, HttpRequest, _] = ???
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
val httpResponse: Flow[HttpResponse, String, _] = ???
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
bcast ~> invalidReq ~> merge
ClosedShape
})
注意:此解决方案拆分了流,因此 Sink 处理字符串值结果的顺序可能与基于输入的预期顺序不同。