并非所有 akka stream Sink 都接收到发出的数据
Not all akka stream Sinks receive the emitted data
当 运行 以下 akka streaming FlowGraph 并非所有发出的字符都被所有接收器接收时。
package sample.stream
import java.io.{ FileOutputStream, PrintWriter }
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Broadcast, FlowGraph, Sink, Source }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success, Try }
object Sample {
def main(args: Array[String]): Unit = {
println("start")
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
var counter = -1
val countSource: Source[Char, Unit] = Source(() => Iterator.continually { counter += 1; (counter + 'A').toChar }.take(11))
var counter1 = 0
val consoleSink1 = Sink.foreach[Char] { counter =>
println("sink1:" + counter1 + ":" + counter)
counter1 += 1
Thread.sleep(100)
//Thread.sleep(300)
}
var counter2 = 0
val consoleSink2 = Sink.foreach[Char] { counter =>
println("sink2:" + counter2 + ":" + counter)
counter2 += 1
Thread.sleep(200)
}
val materialized = FlowGraph.closed(consoleSink1, consoleSink2)((x1, x2) => x1) { implicit builder =>
(console1, console2) =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Char](2))
countSource ~> broadcast ~> console1
broadcast ~> console2
}.run()
// ensure the output file is closed and the system shutdown upon completion
materialized.onComplete {
case Success(_) =>
system.shutdown()
case Failure(e) =>
println(s"Failure: ${e.getMessage}")
system.shutdown()
}
println("waiting the remaining ones")
//scala.concurrent.Await.ready(materialized, scala.concurrent.duration.DurationInt(100).seconds)
//system.shutdown()
println("end")
}
}
在运行之后生成以下输出
[info] Running sample.stream.Sample
[info] start
[info] waiting the remaining ones
[info] end
[info] sink2:0:A
[info] sink1:0:A
[info] sink1:1:B
[info] sink1:2:C
[info] sink2:1:B
[info] sink1:3:D
[info] sink2:2:C
[info] sink1:4:E
[info] sink1:5:F
[info] sink2:3:D
[info] sink1:6:G
[info] sink1:7:H
[info] sink2:4:E
[info] sink2:5:F
[info] sink1:8:I
[info] sink1:9:J
[info] sink2:6:G
[info] sink2:7:H
[info] sink1:10:K
第二个接收器没有收到第 8 个、第 9 个和第 10 个值:IJK,但整个流程仍然结束。
我应该怎么做才能让两个Sink都消耗完数据?
我发现如果我将 (x1,x2)=>x1
更改为 (x1,x2)=>x2
这将等待。这与在第一个接收器中休眠 300 毫秒相同。
您传递给 FlowGraph.closed
第二个参数列表的函数决定了当您 运行 流时 return 编辑的具体化值。因此,当您传入 (x1,x2)=>x1
时,您会 return 一个在第一个接收器获取所有元素时完成的未来,然后该未来的回调将关闭 actor 系统,而第二个接收器没有机会接收所有元素元素。
相反,您应该退出两个 future 并仅在两个 future 都完成后才关闭系统。
您实际上可以看到在某些 akka-stream
测试中如何使用这种方法 here。
当 运行 以下 akka streaming FlowGraph 并非所有发出的字符都被所有接收器接收时。
package sample.stream
import java.io.{ FileOutputStream, PrintWriter }
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Broadcast, FlowGraph, Sink, Source }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success, Try }
object Sample {
def main(args: Array[String]): Unit = {
println("start")
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
var counter = -1
val countSource: Source[Char, Unit] = Source(() => Iterator.continually { counter += 1; (counter + 'A').toChar }.take(11))
var counter1 = 0
val consoleSink1 = Sink.foreach[Char] { counter =>
println("sink1:" + counter1 + ":" + counter)
counter1 += 1
Thread.sleep(100)
//Thread.sleep(300)
}
var counter2 = 0
val consoleSink2 = Sink.foreach[Char] { counter =>
println("sink2:" + counter2 + ":" + counter)
counter2 += 1
Thread.sleep(200)
}
val materialized = FlowGraph.closed(consoleSink1, consoleSink2)((x1, x2) => x1) { implicit builder =>
(console1, console2) =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Char](2))
countSource ~> broadcast ~> console1
broadcast ~> console2
}.run()
// ensure the output file is closed and the system shutdown upon completion
materialized.onComplete {
case Success(_) =>
system.shutdown()
case Failure(e) =>
println(s"Failure: ${e.getMessage}")
system.shutdown()
}
println("waiting the remaining ones")
//scala.concurrent.Await.ready(materialized, scala.concurrent.duration.DurationInt(100).seconds)
//system.shutdown()
println("end")
}
}
在运行之后生成以下输出
[info] Running sample.stream.Sample
[info] start
[info] waiting the remaining ones
[info] end
[info] sink2:0:A
[info] sink1:0:A
[info] sink1:1:B
[info] sink1:2:C
[info] sink2:1:B
[info] sink1:3:D
[info] sink2:2:C
[info] sink1:4:E
[info] sink1:5:F
[info] sink2:3:D
[info] sink1:6:G
[info] sink1:7:H
[info] sink2:4:E
[info] sink2:5:F
[info] sink1:8:I
[info] sink1:9:J
[info] sink2:6:G
[info] sink2:7:H
[info] sink1:10:K
第二个接收器没有收到第 8 个、第 9 个和第 10 个值:IJK,但整个流程仍然结束。
我应该怎么做才能让两个Sink都消耗完数据?
我发现如果我将 (x1,x2)=>x1
更改为 (x1,x2)=>x2
这将等待。这与在第一个接收器中休眠 300 毫秒相同。
您传递给 FlowGraph.closed
第二个参数列表的函数决定了当您 运行 流时 return 编辑的具体化值。因此,当您传入 (x1,x2)=>x1
时,您会 return 一个在第一个接收器获取所有元素时完成的未来,然后该未来的回调将关闭 actor 系统,而第二个接收器没有机会接收所有元素元素。
相反,您应该退出两个 future 并仅在两个 future 都完成后才关闭系统。
您实际上可以看到在某些 akka-stream
测试中如何使用这种方法 here。