如何使用 akka 类型在 akka 流中使用 ActorSink.actorRefWithBackpressure
how to use ActorSink.actorRefWithBackpressure in akka stream using akka typed
我正在尝试使用 akka 类型来学习 akka 流,关于 akka 类型的文档有点抽象
Sink.actorRefWithBackpressure
示例非常简单易懂
演员Sink.actorRefWithBackpressure
示例是抽象的
在第一个示例中,我们有 AckingReceiver
执行所需工作的演员,但在第二个示例中
没有像 AckingReceiver
中那样实施案例 类
val actor: ActorRef[Protocol] = targetActor()
我在某些地方看到过这段代码,但我也无法理解它
def targetActor(): ActorRef[Protocol] = ???
我们如何提供处理案例的目标参与者的实现类
任何帮助将不胜感激
ActorRef[Protocol]
和其他演员一样是类型演员。在 typed 中的 ActorSystem
之外获取 ActorRef
比在 classic 中更复杂,这可能是文档省略它的原因(因为它对于解释如何使用 ActorSink.actorRefWithBackpressure
并不重要)。
通常你会设置一个类型化的 ActorSystem
并要求 ActorSystem
得到一个 ActorRef
:
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._
object MainSystem {
sealed trait Command
case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])
sealed trait Reply
case class ProtocolActorIs(actor: ActorRef[Protocol])
def apply(): Behavior[Command] =
Behaviors.receive { (context, msg) =>
case ObtainProtocolActor(replyTo) =>
val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
// Define the protocol actor
Behaviors.receive[Protocol] { (context, msg) =>
case Init(ackTo) =>
println(s"Actor ${context.self.path} initializing")
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
println(s"Actor ${context.self.path} received $msg")
ackTo ! Ack
Behaviors.same
case Complete =>
context.stop() // Delayed until the message is processed
ackTo ! Ack
Behaviors.same
case Fail(ex) =>
println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
Behaviors.same
})
context.watch(protocolActor)
replyTo ! ProtocolActorIs(protocolActor)
}.receiveSignal {
case (context, Terminated(ref)) =>
println(s"Actor ${ref.path} terminated")
}
}
val actorSystem = ActorSystem(MainSystem(), "main")
def targetActor(): ActorRef[Protocol] = Await.result(
actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
15.second
)
这可能显示了经典和打字之间两个最大的实用但可能不明显的区别:
- typed 中的
ActorSystem
是一个演员(在这个例子中实际上可能有一个 ActorRef[Protocol]
是 ActorSystem
,尽管你不太可能真的想这样做)
- 询问模式发生了相当大的变化
我正在尝试使用 akka 类型来学习 akka 流,关于 akka 类型的文档有点抽象
Sink.actorRefWithBackpressure 示例非常简单易懂 演员Sink.actorRefWithBackpressure 示例是抽象的
在第一个示例中,我们有 AckingReceiver
执行所需工作的演员,但在第二个示例中
没有像 AckingReceiver
val actor: ActorRef[Protocol] = targetActor()
我在某些地方看到过这段代码,但我也无法理解它
def targetActor(): ActorRef[Protocol] = ???
我们如何提供处理案例的目标参与者的实现类 任何帮助将不胜感激
ActorRef[Protocol]
和其他演员一样是类型演员。在 typed 中的 ActorSystem
之外获取 ActorRef
比在 classic 中更复杂,这可能是文档省略它的原因(因为它对于解释如何使用 ActorSink.actorRefWithBackpressure
并不重要)。
通常你会设置一个类型化的 ActorSystem
并要求 ActorSystem
得到一个 ActorRef
:
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._
object MainSystem {
sealed trait Command
case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])
sealed trait Reply
case class ProtocolActorIs(actor: ActorRef[Protocol])
def apply(): Behavior[Command] =
Behaviors.receive { (context, msg) =>
case ObtainProtocolActor(replyTo) =>
val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
// Define the protocol actor
Behaviors.receive[Protocol] { (context, msg) =>
case Init(ackTo) =>
println(s"Actor ${context.self.path} initializing")
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
println(s"Actor ${context.self.path} received $msg")
ackTo ! Ack
Behaviors.same
case Complete =>
context.stop() // Delayed until the message is processed
ackTo ! Ack
Behaviors.same
case Fail(ex) =>
println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
Behaviors.same
})
context.watch(protocolActor)
replyTo ! ProtocolActorIs(protocolActor)
}.receiveSignal {
case (context, Terminated(ref)) =>
println(s"Actor ${ref.path} terminated")
}
}
val actorSystem = ActorSystem(MainSystem(), "main")
def targetActor(): ActorRef[Protocol] = Await.result(
actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
15.second
)
这可能显示了经典和打字之间两个最大的实用但可能不明显的区别:
- typed 中的
ActorSystem
是一个演员(在这个例子中实际上可能有一个ActorRef[Protocol]
是ActorSystem
,尽管你不太可能真的想这样做) - 询问模式发生了相当大的变化