如何使用 akka 类型在 akka 流中使用 ActorSink.actorRefWithBackpressure

how to use ActorSink.actorRefWithBackpressure in akka stream using akka typed

我正在尝试使用 akka 类型来学习 akka 流,关于 akka 类型的文档有点抽象

Sink.actorRefWithBackpressure 示例非常简单易懂 演员Sink.actorRefWithBackpressure 示例是抽象的

在第一个示例中,我们有 AckingReceiver 执行所需工作的演员,但在第二个示例中

没有像 AckingReceiver

中那样实施案例 类
val actor: ActorRef[Protocol] = targetActor()

我在某些地方看到过这段代码,但我也无法理解它

def targetActor(): ActorRef[Protocol] = ???

我们如何提供处理案例的目标参与者的实现类 任何帮助将不胜感激

ActorRef[Protocol] 和其他演员一样是类型演员。在 typed 中的 ActorSystem 之外获取 ActorRef 比在 classic 中更复杂,这可能是文档省略它的原因(因为它对于解释如何使用 ActorSink.actorRefWithBackpressure 并不重要)。

通常你会设置一个类型化的 ActorSystem 并要求 ActorSystem 得到一个 ActorRef:

import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._

object MainSystem {
  sealed trait Command
  case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])

  sealed trait Reply
  case class ProtocolActorIs(actor: ActorRef[Protocol])

  def apply(): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      case ObtainProtocolActor(replyTo) =>
        val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
          // Define the protocol actor
          Behaviors.receive[Protocol] { (context, msg) =>
            case Init(ackTo) =>
              println(s"Actor ${context.self.path} initializing")
              ackTo ! Ack
              Behaviors.same
            case Message(ackTo, msg) =>
              println(s"Actor ${context.self.path} received $msg")
              ackTo ! Ack
              Behaviors.same
            case Complete =>
              context.stop()  // Delayed until the message is processed
              ackTo ! Ack
              Behaviors.same
            case Fail(ex) =>
              println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
              Behaviors.same
          })
        context.watch(protocolActor)
        replyTo ! ProtocolActorIs(protocolActor)
    }.receiveSignal {
      case (context, Terminated(ref)) =>
        println(s"Actor ${ref.path} terminated")
    }
}

val actorSystem = ActorSystem(MainSystem(), "main")

def targetActor(): ActorRef[Protocol] = Await.result(
  actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
  15.second
)

这可能显示了经典和打字之间两个最大的实用但可能不明显的区别:

  • typed 中的 ActorSystem 是一个演员(在这个例子中实际上可能有一个 ActorRef[Protocol]ActorSystem,尽管你不太可能真的想这样做)
  • 询问模式发生了相当大的变化