将 `func(in: Source[A]) : Source[B]` 转换为 `Flow[A, B]`
Turn `func(in: Source[A]) : Source[B]` into a `Flow[A, B]`
我正在使用 akka-grpc 生成客户端绑定。它们的形式通常为
func[A, B](in: Source[A]) : Source[B]
,
即他们消费 Source[A]
并提供 Source[B]
.
现在,我想将 func
变成 Flow[A, B]
以将它们与 akka-stream 一起使用。
解决方法是:
def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }
它使用prefixAndTail
劫持底层Source
。
我正在使用 akka-grpc 生成客户端绑定。它们的形式通常为
func[A, B](in: Source[A]) : Source[B]
,
即他们消费 Source[A]
并提供 Source[B]
.
现在,我想将 func
变成 Flow[A, B]
以将它们与 akka-stream 一起使用。
解决方法是:
def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }
它使用prefixAndTail
劫持底层Source
。