Akka-streams:在流程开始时执行操作

Akka-streams: execute action on flow start

akka-streams

中有流程描述
val flow: Flow[Input, Output, Unit] = ???

,如何修改它以获得新的流程描述,在开始时执行指定的副作用,即当流程具体化时?

你自己说的,使用实体化的力量:

val newFlow = flow.mapMaterializedValue(_ ⇒ println("materialized"))

流处理图的开始具体化将同时使其一块一块地运行。执行保证在第一个元素传递到该图中某处之前发生的操作的唯一方法是在具体化该图之前执行该操作。从这个意义上说,sschaef 的回答有点不正确:使用 mapMaterializedValue 很早就运行了动作,但不能保证在处理第一个元素之前就发生了。

如果我们在这里谈论 Flow,它只在一侧接受输入并在另一侧产生输出——即它不包含内部循环或数据源——那么在第一个元素到达之前要执行操作,您可以做的一件事是将一个处理步骤附加到它的输入上:

def effectSource[T](block: => Unit) = Source.fromIterator(() => {block; Iterator.empty})
val newFlow = Flow[Input].prepend(effectSource(/* do stuff */)).via(flow)

以上是使用即将到来的 2.0 语法,在 Akka Streams 1.0 中它将是 Source(() => { block; Iterator.empty }) 并且需要使用 FlowGraph DSL 完成前置操作(可以找到该图 here) .