如何使用 Akka 做类型安全、线程安全的边界?
How to do type safe, thread safe boundaries with Akka?
在 akka typed 中的参与者组之间实现模块边界的公认做法是什么?
TL/DR
下面是 working repo 示例。
我如何实现单个参与者接收在两个不同协议中定义的(预)消息,类似于在 OO 中实现两个不同的接口。
例子
关于边界,我指的是经典的 OO 接口边界:仅公开与其他模块相关的操作。
例如:考虑爱丽丝、鲍勃和查理。爱丽丝喜欢和鲍勃聊天,而查理经常想知道鲍勃最近怎么样。查理不知道爱丽丝(也不应该),反之亦然。每对之间存在一个协议,它们可以相互接收消息:
trait Protocol[ From, To ]
object Alice
{
sealed trait BobToAlice extends Protocol[ Bob, Alice ]
case object ApologizeToAlice extends BobToAlice
case object LaughAtAlice extends BobToAlice
}
object Bob
{
sealed trait AliceToBob extends Protocol[ Alice, Bob ]
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[ Charlie, Bob ]
case object HowYouDoinBob extends CharlieToBob
}
object Charlie
{
sealed trait BobToCharlie extends Protocol[ Bob, Charlie ]
case object CryToCharlie extends BobToCharlie
case object LaughToCharlie extends BobToCharlie
}
这里的边界是 Bob 的两张面孔:与 Alice 交谈和与 Charlie 交谈是两种不同的协议。每个人现在都可以与 Bob 交谈,而无需了解对方。例如,爱丽丝喜欢唱歌,但她唱歌时不会被人嘲笑:
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.{ ActorRef, Behavior }
class Alice( bob: ActorRef[ Protocol[ Alice, Bob ] ] )
{
import Alice._
import nl.papendorp.solipsism.protocol.Bob.{ ScoldBob, SingToBob }
val talkToBob: Behavior[ BobToAlice ] = Behaviors.receiveMessage
{
case LaughAtAlice =>
bob ! ScoldBob
same
case ApologizeToAlice =>
bob ! SingToBob
same
}
}
另一方面,查理只关心鲍勃此刻的感受:
import akka.actor.typed.scaladsl.Behaviors.{ receiveMessage, same }
import akka.actor.typed.{ ActorRef, Behavior }
class Charlie(bob: ActorRef[Protocol[Charlie,Bob]])
{
import Charlie._
import nl.papendorp.solipsism.protocol.Bob.HowYouDoinBob
val concerned: Behavior[BobToCharlie] = receiveMessage
{
case CryToCharlie =>
bob ! HowYouDoinBob
same
case LaughToCharlie =>
bob ! HowYouDoinBob
same
}
}
但是,Alice 对 Bob 情绪的影响会影响 Bob 与 Charlie 交谈的方式。为此,我们需要通过 BobsPersonalLife
统一两个协议,以便能够在单个参与者中表示它们:
import akka.actor.typed.scaladsl.Behaviors._
import akka.actor.typed.{ ActorRef, Behavior }
import Alice.BobToAlice
import Charlie.BobToCharlie
object Bob
{
private[ Bob ] sealed trait BobsPersonalLife
sealed trait AliceToBob extends Protocol[Alice, Bob] with BobsPersonalLife
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[Charlie, Bob] with BobsPersonalLife
case object HowYouDoinBob extends CharlieToBob
}
class Bob( alice: ActorRef[BobToAlice], charlie: ActorRef[BobToCharlie] )
{
import Alice._
import Bob._
import Charlie._
private val happy: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! LaughToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
sad
case SingToBob =>
alice ! LaughAtAlice
same
}
val sad: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! CryToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
same
case SingToBob =>
alice ! LaughAtAlice
happy
}
}
到目前为止,还不错。我们可以使用 ActorRef.narrow[ _X_ToBob ]
实例化 Alice 和 Charlie。但是鲍勃呢?或者更确切地说,鲍勃的另一个自我?如果我们想用 Boris 代替 Bob,Boris 不向 Charlie 抱怨而是向 Doris 抱怨,使用 DorisToBob extends Protocol[ Doris, Bob ]
,我们将无法再接收来自 Alice 的消息,因为 AliceToBob
和 [= 没有共享的超特征18=]。突然之间,BobsPersonalLife
锁定了所有 Bob Alice 可以与之交谈的对象。
用鲍里斯代替鲍勃的方法是什么?如果我们使用 ActorRef.unsafeUpcast
就会失去类型安全。如果我们在共享状态上使用两个参与者,我们就会失去线程安全性。包装 _X_ToBob(例如 Either[ AliceToBob, CharlieToBob ]
或 Dotty 的 shorthand 联合类型)也不起作用,因为包装器只是接管了 BobsPersonalLife
的角色。当我们让 DorisToBob
继承自 BobsPersonalLife
时,我们最终得到所有 Bob 的所有可能伙伴的联合,永远无法删除他们中的任何一个。
问题
我们如何才能在 Bob 内部实现 Alice 和 Charlie 之间真正类型安全的解耦?
我认为这接近于成为一个 X:Y 问题(“我如何在 Akka 中设置接口边界”与“我如何在 Akka 中实现接口边界的目标”)。
object Protocol {
sealed trait Message
sealed trait LaughReply extends Message
sealed trait MoodReply extends Message
case class Apology(from: ActorRef[Singing]) extends Message
case class Singing(from: ActorRef[Laughing]) extends Message
case class Laughing(from: ActorRef[LaughReply]) extends Message with MoodReply
case class HowYouDoin(replyTo: ActorRef[MoodReply]) extends Message with LaughReply
case class Scolding(from: ActorRef[Apology]) extends Message with LaughReply
case class Crying(from: ActorRef[HowYouDoin]) extends Message with MoodReply
}
object Alice {
val talkToBob: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Apology(from) =>
from ! Singing(context.self)
Behaviors.same
case Laughing(from) =>
from ! Scolding(context.self)
Behaviors.same
case _ => // Every other message is ignored by Alice
Behaviors.same
}
}
}
object Charlie {
val concerned: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Crying(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case Laughing(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}
object Bob {
val happy: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Laughing(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
sad
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
val sad: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Crying(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
Behaviors.same
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}
技巧基本上是通过 mixins 分解协议并在消息中编码协议状态(接受哪些消息)。只要没有人持有对 ActorRef[Message]
的引用(ActorRef
是逆变的,所以 ActorRef[LaughReply]
不是 ActorRef[Message]
),就无法发送消息target 尚未承诺接受。请注意,将 ActorRef
保持在 actor 的状态会积极地与此相反:如果您要将另一个 ActorRef
保持在 actor 的状态,那么在 IMO 中,这是一个非常强烈的信号,表明您不在都有兴趣将它们解耦。
替代方案,而不是总体协议,是为每个 Alice/Bob/Charlie/etc 制定协议。使用仅在该参与者的上下文中定义的命令和回复,并使用例如类型化的询问模式来使目标参与者的回复协议适应请求参与者的命令协议。
在 akka typed 中的参与者组之间实现模块边界的公认做法是什么?
TL/DR
下面是 working repo 示例。
我如何实现单个参与者接收在两个不同协议中定义的(预)消息,类似于在 OO 中实现两个不同的接口。
例子
关于边界,我指的是经典的 OO 接口边界:仅公开与其他模块相关的操作。
例如:考虑爱丽丝、鲍勃和查理。爱丽丝喜欢和鲍勃聊天,而查理经常想知道鲍勃最近怎么样。查理不知道爱丽丝(也不应该),反之亦然。每对之间存在一个协议,它们可以相互接收消息:
trait Protocol[ From, To ]
object Alice
{
sealed trait BobToAlice extends Protocol[ Bob, Alice ]
case object ApologizeToAlice extends BobToAlice
case object LaughAtAlice extends BobToAlice
}
object Bob
{
sealed trait AliceToBob extends Protocol[ Alice, Bob ]
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[ Charlie, Bob ]
case object HowYouDoinBob extends CharlieToBob
}
object Charlie
{
sealed trait BobToCharlie extends Protocol[ Bob, Charlie ]
case object CryToCharlie extends BobToCharlie
case object LaughToCharlie extends BobToCharlie
}
这里的边界是 Bob 的两张面孔:与 Alice 交谈和与 Charlie 交谈是两种不同的协议。每个人现在都可以与 Bob 交谈,而无需了解对方。例如,爱丽丝喜欢唱歌,但她唱歌时不会被人嘲笑:
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.{ ActorRef, Behavior }
class Alice( bob: ActorRef[ Protocol[ Alice, Bob ] ] )
{
import Alice._
import nl.papendorp.solipsism.protocol.Bob.{ ScoldBob, SingToBob }
val talkToBob: Behavior[ BobToAlice ] = Behaviors.receiveMessage
{
case LaughAtAlice =>
bob ! ScoldBob
same
case ApologizeToAlice =>
bob ! SingToBob
same
}
}
另一方面,查理只关心鲍勃此刻的感受:
import akka.actor.typed.scaladsl.Behaviors.{ receiveMessage, same }
import akka.actor.typed.{ ActorRef, Behavior }
class Charlie(bob: ActorRef[Protocol[Charlie,Bob]])
{
import Charlie._
import nl.papendorp.solipsism.protocol.Bob.HowYouDoinBob
val concerned: Behavior[BobToCharlie] = receiveMessage
{
case CryToCharlie =>
bob ! HowYouDoinBob
same
case LaughToCharlie =>
bob ! HowYouDoinBob
same
}
}
但是,Alice 对 Bob 情绪的影响会影响 Bob 与 Charlie 交谈的方式。为此,我们需要通过 BobsPersonalLife
统一两个协议,以便能够在单个参与者中表示它们:
import akka.actor.typed.scaladsl.Behaviors._
import akka.actor.typed.{ ActorRef, Behavior }
import Alice.BobToAlice
import Charlie.BobToCharlie
object Bob
{
private[ Bob ] sealed trait BobsPersonalLife
sealed trait AliceToBob extends Protocol[Alice, Bob] with BobsPersonalLife
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[Charlie, Bob] with BobsPersonalLife
case object HowYouDoinBob extends CharlieToBob
}
class Bob( alice: ActorRef[BobToAlice], charlie: ActorRef[BobToCharlie] )
{
import Alice._
import Bob._
import Charlie._
private val happy: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! LaughToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
sad
case SingToBob =>
alice ! LaughAtAlice
same
}
val sad: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! CryToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
same
case SingToBob =>
alice ! LaughAtAlice
happy
}
}
到目前为止,还不错。我们可以使用 ActorRef.narrow[ _X_ToBob ]
实例化 Alice 和 Charlie。但是鲍勃呢?或者更确切地说,鲍勃的另一个自我?如果我们想用 Boris 代替 Bob,Boris 不向 Charlie 抱怨而是向 Doris 抱怨,使用 DorisToBob extends Protocol[ Doris, Bob ]
,我们将无法再接收来自 Alice 的消息,因为 AliceToBob
和 [= 没有共享的超特征18=]。突然之间,BobsPersonalLife
锁定了所有 Bob Alice 可以与之交谈的对象。
用鲍里斯代替鲍勃的方法是什么?如果我们使用 ActorRef.unsafeUpcast
就会失去类型安全。如果我们在共享状态上使用两个参与者,我们就会失去线程安全性。包装 _X_ToBob(例如 Either[ AliceToBob, CharlieToBob ]
或 Dotty 的 shorthand 联合类型)也不起作用,因为包装器只是接管了 BobsPersonalLife
的角色。当我们让 DorisToBob
继承自 BobsPersonalLife
时,我们最终得到所有 Bob 的所有可能伙伴的联合,永远无法删除他们中的任何一个。
问题
我们如何才能在 Bob 内部实现 Alice 和 Charlie 之间真正类型安全的解耦?
我认为这接近于成为一个 X:Y 问题(“我如何在 Akka 中设置接口边界”与“我如何在 Akka 中实现接口边界的目标”)。
object Protocol {
sealed trait Message
sealed trait LaughReply extends Message
sealed trait MoodReply extends Message
case class Apology(from: ActorRef[Singing]) extends Message
case class Singing(from: ActorRef[Laughing]) extends Message
case class Laughing(from: ActorRef[LaughReply]) extends Message with MoodReply
case class HowYouDoin(replyTo: ActorRef[MoodReply]) extends Message with LaughReply
case class Scolding(from: ActorRef[Apology]) extends Message with LaughReply
case class Crying(from: ActorRef[HowYouDoin]) extends Message with MoodReply
}
object Alice {
val talkToBob: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Apology(from) =>
from ! Singing(context.self)
Behaviors.same
case Laughing(from) =>
from ! Scolding(context.self)
Behaviors.same
case _ => // Every other message is ignored by Alice
Behaviors.same
}
}
}
object Charlie {
val concerned: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Crying(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case Laughing(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}
object Bob {
val happy: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Laughing(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
sad
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
val sad: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Crying(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
Behaviors.same
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}
技巧基本上是通过 mixins 分解协议并在消息中编码协议状态(接受哪些消息)。只要没有人持有对 ActorRef[Message]
的引用(ActorRef
是逆变的,所以 ActorRef[LaughReply]
不是 ActorRef[Message]
),就无法发送消息target 尚未承诺接受。请注意,将 ActorRef
保持在 actor 的状态会积极地与此相反:如果您要将另一个 ActorRef
保持在 actor 的状态,那么在 IMO 中,这是一个非常强烈的信号,表明您不在都有兴趣将它们解耦。
替代方案,而不是总体协议,是为每个 Alice/Bob/Charlie/etc 制定协议。使用仅在该参与者的上下文中定义的命令和回复,并使用例如类型化的询问模式来使目标参与者的回复协议适应请求参与者的命令协议。