Akka 在不访问流的情况下组合汇
Akka combining Sinks without access to Flows
我正在使用一个 API 接受单个 AKKA 接收器并用数据填充它:
def fillSink(sink:Sink[String, _])
有没有办法在不深入研究 akka 的情况下用两个接收器而不是一个接收器来处理输出?
例如
val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )
如果我可以访问 fillSink
方法使用的流程,我可以使用 flow.alsoTo(mySink1).to(mySink2)
但流程不会公开。
目前唯一的解决方法是传递一个处理字符串的 Sink 并将其传递给两个 StringBuilder 以替换 mySink1/mySink2
,但感觉这违背了 AKKA 的要点。没有花几天时间学习 AKKA,我不知道是否有办法从接收器中分离输出。
谢谢!
combine
Sink 运算符使用提供的 Int => Graph[UniformFanOutShape[T, U], NotUsed]]
函数组合两个或多个 Sink,可能正是您要寻找的:
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]
一个简单的例子:
val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))
val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))
Source(List(1, 2, 3)).runWith(combinedSink)
// Doubler: 2
// Triper: 3
// Doubler: 4
// Triper: 6
// Doubler: 6
// Triper: 9
我正在使用一个 API 接受单个 AKKA 接收器并用数据填充它:
def fillSink(sink:Sink[String, _])
有没有办法在不深入研究 akka 的情况下用两个接收器而不是一个接收器来处理输出?
例如
val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )
如果我可以访问 fillSink
方法使用的流程,我可以使用 flow.alsoTo(mySink1).to(mySink2)
但流程不会公开。
目前唯一的解决方法是传递一个处理字符串的 Sink 并将其传递给两个 StringBuilder 以替换 mySink1/mySink2
,但感觉这违背了 AKKA 的要点。没有花几天时间学习 AKKA,我不知道是否有办法从接收器中分离输出。
谢谢!
combine
Sink 运算符使用提供的 Int => Graph[UniformFanOutShape[T, U], NotUsed]]
函数组合两个或多个 Sink,可能正是您要寻找的:
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]
一个简单的例子:
val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))
val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))
Source(List(1, 2, 3)).runWith(combinedSink)
// Doubler: 2
// Triper: 3
// Doubler: 4
// Triper: 6
// Doubler: 6
// Triper: 9