根据 Source 的物化值创建 Sink

Create Sink based on materialised value of a Source

我想围绕 Pubsub 主题构建流

\-----------------------------------------------------\
 \  ------------------  ------------    -------------  \
  > > wrapWithPublish > > toPubsub |    | fromPubsub >  >
 /  ------------------  ------------    -------------  /
/-----------------------------------------------------/

这是我到目前为止写的代码

def mediatorFlow[In, Out](mediator: ActorRef, topic: String): Flow[In, Out, Unit] = {
  val source =
    Source
      .actorRef[Out](10, OverflowStrategy.dropHead)
      .mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }

  val wrapWithPublish =
    Flow[In].map(DistributedPubSubMediator.Publish(topic, _))

  val unsubscribe = DistributedPubSubMediator.Unsubscribe(topic, ref???)

  val toPubsub =
    Sink.actorRef[DistributedPubSubMediator.Publish](mediator, unsubscribe)

  Flow.fromSinkAndSource(wrapWithPublish to toPubsub, source)
}

问题出在unsubscribe的定义上,我想在流的末尾发送一个DistributedPubSubMediator.Subscribe,指定一个ref应该是ref的物化值source 以上定义。

我知道 Pubsub 会在流结束时自动取消订阅 Actor。但是我很好奇怎么解决这个问题。

要实现这一点,您需要构建一个比 fromSinkAndSource 更紧密的流,您需要使用 GraphDSL:

val source = ... // as above
Flow.fromGraph(GraphDSL.create(source) { implicit b =>
  src =>
    import GraphDSL.Implicits._
    val concat = b.add(Concat[Any](2))
    val wrapWithPublish = b.add(Flow[In].map(DistributedPubSubMediator.Publish(topic, _)))
    val toPubSub = b.add(Sink.actorRef[Any](mediator, unsubscribe))

    wrapWithPublish ~> concat ~> toPubSub
    b.materializedValue.map(DistributedPubSubMediator.Unsubscribe(topic, _)) ~> concat

    FlowShape(wrapWithPublish.in, src.out)
})

通过这种方式,您可以将其中一个部分的物化值注入到流元素的级别,使其易于发送到 pubsub 中介。