重用 akka-stream 流的优雅方式
Elegant way of reusing akka-stream flows
我正在寻找一种轻松重用 akka-stream 流的方法。
我将打算重用的 Flow 视为函数,因此我想保留其签名:
Flow[Input, Output, NotUsed]
现在,当我使用此流程时,我希望能够 'call' 此流程并将结果放在一边以供进一步处理。
所以我想从 Flow emitting [Input]
开始,应用我的 flow,然后继续 Flow emitting [(Input, Output)]
。
示例:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
现在这不可能以直接的方式实现,因为将流与 .via()
结合起来会让我流发出 [Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
另一种方法是让我的可重用流程发出 [(Input, Output)]
但这需要每个流程将其输入推送到所有阶段并使我的代码看起来很糟糕。
所以我想出了这样一个组合器:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
将输入广播到流中,并直接在平行线中广播 -> 都到 'Zip' 阶段,在该阶段我将值连接到一个元组中。然后可以优雅地应用它:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
一切都很好,但是当给定的流程正在执行 'filter' 操作时 - 这个组合器被卡住并停止处理进一步的事件。
我猜这是由于 'Zip' 要求所有子流都执行相同操作的行为 - 在我的例子中,一个分支直接传递给定对象,因此另一个子流无法忽略此元素。 filter(),因为它确实如此 - 流程停止,因为 Zip 正在等待推送。
有没有更好的方法来实现流组合?
当 'flow' 忽略带有 'filter' 的元素时,我可以在 tupledFlow 中做些什么来获得所需的行为?
两种可能的方法 - 有争议的优雅 - 是:
1) 避免使用过滤阶段,将您的过滤器变为 Flow[Int, Option[Int], NotUsed]
。通过这种方式,您可以像最初的计划一样在整个图形周围应用压缩包装。但是,代码看起来更脏,并且通过传递 None
s 增加了开销。
val stringIfEvenOrNone = Flow[Int].map{
case x if x % 2 == 0 => Some(x.toString)
case _ => None
}
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{
case (num, Some(str)) => (num,str)
}
2) 分离过滤和转换阶段,并在压缩包装器之前应用过滤阶段。可能是更轻量级和更好的折衷方案。
val filterEven = Flow[Int].filter(_ % 2 == 0)
val toString = Flow[Int].map(_.toString)
val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString))
编辑
3) 根据评论中的讨论,为了清楚起见,在此发布另一个解决方案。
此流包装器允许从给定流中发出每个元素,并与生成它的原始输入元素配对。它适用于任何类型的内部流(为每个输入发射 0、1 或更多元素)。
def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] =
Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out))
我想出了一个 TupledFlow
的实现,当包装 Flow
使用 filter()
或 mapAsync()
并且包装时 Flow
发出 0,1或每个输入的 N 个元素:
def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = {
val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In =>
val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq)
val bothFuture: Future[Seq[(In,Out)]] = outFuture.map( seqOfOut => seqOfOut.map((in,_)) )
bothFuture
}
val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable])
onlyDefined
}
我在这里看到的唯一缺点是我正在为单个实体实例化和具体化流程 - 只是为了获得 'calling a flow as a function'.
的概念
我没有对此进行任何性能测试 - 然而,由于繁重的工作是在一个包装好的 Flow
中完成的,它会在未来执行 - 我相信这会没问题。
此实现通过了 https://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala
中的所有测试
我正在寻找一种轻松重用 akka-stream 流的方法。
我将打算重用的 Flow 视为函数,因此我想保留其签名:
Flow[Input, Output, NotUsed]
现在,当我使用此流程时,我希望能够 'call' 此流程并将结果放在一边以供进一步处理。
所以我想从 Flow emitting [Input]
开始,应用我的 flow,然后继续 Flow emitting [(Input, Output)]
。
示例:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
现在这不可能以直接的方式实现,因为将流与 .via()
结合起来会让我流发出 [Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
另一种方法是让我的可重用流程发出 [(Input, Output)]
但这需要每个流程将其输入推送到所有阶段并使我的代码看起来很糟糕。
所以我想出了这样一个组合器:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
将输入广播到流中,并直接在平行线中广播 -> 都到 'Zip' 阶段,在该阶段我将值连接到一个元组中。然后可以优雅地应用它:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
一切都很好,但是当给定的流程正在执行 'filter' 操作时 - 这个组合器被卡住并停止处理进一步的事件。
我猜这是由于 'Zip' 要求所有子流都执行相同操作的行为 - 在我的例子中,一个分支直接传递给定对象,因此另一个子流无法忽略此元素。 filter(),因为它确实如此 - 流程停止,因为 Zip 正在等待推送。
有没有更好的方法来实现流组合? 当 'flow' 忽略带有 'filter' 的元素时,我可以在 tupledFlow 中做些什么来获得所需的行为?
两种可能的方法 - 有争议的优雅 - 是:
1) 避免使用过滤阶段,将您的过滤器变为 Flow[Int, Option[Int], NotUsed]
。通过这种方式,您可以像最初的计划一样在整个图形周围应用压缩包装。但是,代码看起来更脏,并且通过传递 None
s 增加了开销。
val stringIfEvenOrNone = Flow[Int].map{
case x if x % 2 == 0 => Some(x.toString)
case _ => None
}
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{
case (num, Some(str)) => (num,str)
}
2) 分离过滤和转换阶段,并在压缩包装器之前应用过滤阶段。可能是更轻量级和更好的折衷方案。
val filterEven = Flow[Int].filter(_ % 2 == 0)
val toString = Flow[Int].map(_.toString)
val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString))
编辑
3) 根据评论中的讨论,为了清楚起见,在此发布另一个解决方案。
此流包装器允许从给定流中发出每个元素,并与生成它的原始输入元素配对。它适用于任何类型的内部流(为每个输入发射 0、1 或更多元素)。
def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] =
Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out))
我想出了一个 TupledFlow
的实现,当包装 Flow
使用 filter()
或 mapAsync()
并且包装时 Flow
发出 0,1或每个输入的 N 个元素:
def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = {
val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In =>
val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq)
val bothFuture: Future[Seq[(In,Out)]] = outFuture.map( seqOfOut => seqOfOut.map((in,_)) )
bothFuture
}
val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable])
onlyDefined
}
我在这里看到的唯一缺点是我正在为单个实体实例化和具体化流程 - 只是为了获得 'calling a flow as a function'.
的概念我没有对此进行任何性能测试 - 然而,由于繁重的工作是在一个包装好的 Flow
中完成的,它会在未来执行 - 我相信这会没问题。
此实现通过了 https://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala
中的所有测试