在 Akka Streams 中创建来自 actor 的流

Creating a flow from actor in Akka Streams

可以分别使用 Source.actorPublisher()Sink.actorSubscriber() 方法从 actor 创建源和汇。但是是否可以从 actor 创建一个 Flow

从概念上讲,似乎没有充分的理由不这样做,因为它实现了 ActorPublisherActorSubscriber 特征,但不幸的是,Flow 对象没有有什么方法可以做到这一点。在 this 优秀的博客 post 中,它是在早期版本的 Akka Streams 中完成的,所以问题是是否也可以在最新的 (2.4.9) 版本中完成。

我是 Akka 团队的一员,想用这个问题来澄清一些关于原始 Reactive Streams 接口的事情。我希望你会发现这很有用。

最值得注意的是,我们将很快在 Akka 团队博客上发布多篇关于构建自定义阶段(包括 Flows)的文章,敬请关注。

不要使用 ActorPublisher / ActorSubscriber

请不要使用 ActorPublisherActorSubscriber。它们级别太低,您最终可能会以违反 Reactive Streams specification. They're a relict of the past and even then were only "power-user mode only". There really is no reason to use those classes nowadays. We never provided a way to build a flow because the complexity is simply explosive if it was exposed as "raw" Actor API for you to implement and get all the rules implemented correctly.

的方式实施它们

如果您真的很想实现原始 ReactiveStreams 接口,那么请务必使用 Specification's TCK 来验证您的实现是否正确。您可能会被一些更复杂的极端情况 a Flow(或者在 RS 术语中 a Processor 必须处理)弄得措手不及。

大多数操作无需低级即可构建

您应该能够通过从 Flow[T] 构建并在其上添加所需的操作来简单地构建许多流程,例如:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

这是流程的可重用描述。

由于您询问的是高级用户模式,因此这是 DSL 本身最强大的运算符:statefulFlatMapConcat。绝大多数对纯流元素的操作都可以使用它来表达:Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T].

如果您需要计时器,您可以 zip 使用 Source.timer

GraphStage 最简单且最安全 API 构建自定义阶段

相反,建筑Sources/Flows/Sinks有其强大的和安全API:GraphStage。请阅读documentation about building custom GraphStages(它们可以是Sink/Source/Flow甚至任何任意形状)。它为您处理所有复杂的 Reactive Streams 规则,同时在实现您的阶段(可能是 Flow)时为您提供充分的自由和类型安全。

例如,取自文档,是 filter(T => Boolean) 运算符的 GraphStage 实现:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

它还处理异步通道,默认情况下是可融合的。

除了文档之外,这些博文还详细解释了为什么这个 API 是构建任何形状的自定义阶段的圣杯:

Konrad 的解决方案演示了如何创建一个利用 Actors 的自定义舞台,但在大多数情况下,我认为这有点矫枉过正。

通常你有一些能够回答问题的演员:

val actorRef : ActorRef = ???

type Input = ???
type Output = ???

val queryActor : Input => Future[Output] = 
  (actorRef ? _) andThen (_.mapTo[Output])

这可以很容易地与基本的 Flow 功能一起使用,它接受最大数量的并发请求:

val actorQueryFlow : Int => Flow[Input, Output, _] =
  (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)

现在 actorQueryFlow 可以集成到任何流中...

这是一个使用图形阶段构建的解决方案。 Actor 必须确认所有消息才能产生背压。当流 fails/completes 时通知演员,当演员终止时流失败。 如果您不想使用询问,这可能很有用,例如当不是每个输入消息都有相应的输出消息时。

import akka.actor.{ActorRef, Status, Terminated}
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

object ActorRefBackpressureFlowStage {
  case object StreamInit
  case object StreamAck
  case object StreamCompleted
  case class StreamFailed(ex: Throwable)
  case class StreamElementIn[A](element: A)
  case class StreamElementOut[A](element: A)
}

/**
  * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
  * First element is always `StreamInit`, then stream is waiting for acknowledgement message
  * `ackMessage` from the given actor which means that it is ready to process
  * elements. It also requires `ackMessage` message after each stream element
  * to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages.
  *
  * The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will
  * be emitted downstream when there is demand.
  *
  * If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
  * When the stream is completed successfully a `StreamCompleted` message
  * will be sent to the destination actor.
  * When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor.
  */
class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {

  import ActorRefBackpressureFlowStage._

  val in: Inlet[In] = Inlet("ActorFlowIn")
  val out: Outlet[Out] = Outlet("ActorFlowOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    private lazy val self = getStageActor {
      case (_, StreamAck) =>
        if(firstPullReceived) {
          if (!isClosed(in) && !hasBeenPulled(in)) {
            pull(in)
          }
        } else {
          pullOnFirstPullReceived = true
        }

      case (_, StreamElementOut(elemOut)) =>
        val elem = elemOut.asInstanceOf[Out]
        emit(out, elem)

      case (_, Terminated(targetRef)) =>
        failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))

      case (actorRef, unexpected) =>
        failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`."))
    }
    var firstPullReceived: Boolean = false
    var pullOnFirstPullReceived: Boolean = false

    override def preStart(): Unit = {
      //initialize stage actor and watch flow actor.
      self.watch(flowActor)
      tellFlowActor(StreamInit)
    }

    setHandler(in, new InHandler {

      override def onPush(): Unit = {
        val elementIn = grab(in)
        tellFlowActor(StreamElementIn(elementIn))
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        tellFlowActor(StreamFailed(ex))
        super.onUpstreamFailure(ex)
      }

      override def onUpstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onUpstreamFinish()
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if(!firstPullReceived) {
          firstPullReceived = true
          if(pullOnFirstPullReceived) {
            if (!isClosed(in) && !hasBeenPulled(in)) {
              pull(in)
            }
          }
        }

      }

      override def onDownstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onDownstreamFinish()
      }
    })

    private def tellFlowActor(message: Any): Unit = {
      flowActor.tell(message, self.ref)
    }

  }

  override def shape: FlowShape[In, Out] = FlowShape(in, out)

}