'Graph must be connected' 使用从发布者+订阅者参与者创建的流程
'Graph must be connected' with Flow created from Publisher+Subscriber actor
class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
var events = Seq.empty[Int]
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive: Actor.Receive = {
case OnNext(e: Int) => events = e +: events
case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
}
}
val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))
FlowGraph { implicit b =>
import akka.stream.scaladsl.FlowGraphImplicits._
Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
println("Got a number " + e)
)
}.run()
根据 Flow.apply(Sink, Source)
文档:
Create a Flow from a seemingly disconnected Source and Sink pair.
如果这是真的,为什么图仍然没有连接?
This is a known issue and will be fully fixed in M4.
class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
var events = Seq.empty[Int]
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive: Actor.Receive = {
case OnNext(e: Int) => events = e +: events
case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
}
}
val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))
FlowGraph { implicit b =>
import akka.stream.scaladsl.FlowGraphImplicits._
Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
println("Got a number " + e)
)
}.run()
根据 Flow.apply(Sink, Source)
文档:
Create a Flow from a seemingly disconnected Source and Sink pair.
如果这是真的,为什么图仍然没有连接?
This is a known issue and will be fully fixed in M4.