将 `func(in: Source[A]) : Source[B]` 转换为 `Flow[A, B]`

Turn `func(in: Source[A]) : Source[B]` into a `Flow[A, B]`

我正在使用 akka-grpc 生成客户端绑定。它们的形式通常为

func[A, B](in: Source[A]) : Source[B],

即他们消费 Source[A] 并提供 Source[B].

现在,我想将 func 变成 Flow[A, B] 以将它们与 akka-stream 一起使用。

解决方法是:

  def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] = 
    Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }

它使用prefixAndTail劫持底层Source