'Graph must be connected' 使用从发布者+订阅者参与者创建的流程

'Graph must be connected' with Flow created from Publisher+Subscriber actor

class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
  var events = Seq.empty[Int]

  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
  override def receive: Actor.Receive = {
    case OnNext(e: Int) => events = e +: events
    case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
  }
}

val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))

FlowGraph { implicit b =>
  import akka.stream.scaladsl.FlowGraphImplicits._

  Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
    println("Got a number " + e)
  )
}.run()

根据 Flow.apply(Sink, Source) 文档:

Create a Flow from a seemingly disconnected Source and Sink pair.

如果这是真的,为什么图仍然没有连接?

Endre@akka-user:

This is a known issue and will be fully fixed in M4.