序列化程序是从 Akka 消息中删除共享状态的正确位置吗?

Are serializers the right spot to remove shared state from Akka messages?

我正在研究分布式算法,并决定使用 Akka 跨机器扩展它。机器需要非常频繁地交换消息,这些消息引用一些存在于每台机器上的不可变对象。因此,"compress" 消息似乎是明智的,因为共享的、复制的对象不应在消息中序列化。这不仅可以节省网络带宽,还可以避免在反序列化消息时在接收端创建重复对象。

现在,我的问题是如何正确执行此操作。到目前为止,我能想到两个选择:

  1. 在 "business layer" 上处理这个问题,即,将我的原始消息对象转换为一些引用对象,这些对象通过一些符号引用替换对共享、复制对象的引用。然后,我会发送那些参考对象而不是原始消息。将其视为用 URL 替换一些实际的 Web 资源。这样做在编码方面似乎相当直接,但它也将序列化问题拖入了实际的业务逻辑中。

  2. 编写可识别共享、复制对象的自定义序列化程序。在我的例子中,这个解决方案可以通过序列化程序将复制的共享对象作为全局状态引入到参与者系统中。但是,Akka 文档没有描述如何以编程方式添加自定义序列化程序,这对于使用序列化程序编织共享对象是必需的。另外,我可以想象有几个原因,为什么不鼓励这样的解决方案。所以,我在这里问一下。

非常感谢!

可以编写您自己的自定义序列化器并让它们做各种奇怪的事情,然后您可以像往常一样在配置级别绑定它们:

class MyOwnSerializer extends Serializer {

  // If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
  // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
  // Get a logger using:
  // private val logger = Logging(actorSystem, this)

  // This is whether "fromBinary" requires a "clazz" or not
  def includeManifest: Boolean = true

  // Pick a unique identifier for your Serializer,
  // you've got a couple of billions to choose from,
  // 0 - 40 is reserved by Akka itself
  def identifier = 1234567

  // "toBinary" serializes the given object to an Array of Bytes
  def toBinary(obj: AnyRef): Array[Byte] = {
    // Put the code that serializes the object here
    //#...
    Array[Byte]()
    //#...
  }

  // "fromBinary" deserializes the given array,
  // using the type hint (if any, see "includeManifest" above)
  def fromBinary(
    bytes: Array[Byte],
    clazz: Option[Class[_]]): AnyRef = {
    // Put your code that deserializes here
    //#...
    null
    //#...
  }
}

但这提出了一个重要的问题:如果你的消息都引用了已经在机器上共享的数据,你为什么要在消息中放入指向对象的指针(非常糟糕!消息应该是不可变的,而指针不是!),而不是某种不可变的字符串 objectId(有点像您的选项 1)?在保持消息的不变性方面,这是一个更好的选择,并且您的业务逻辑几乎没有变化(只需在共享状态存储上放置一个包装器)

有关详细信息,请参阅 the documentation

我最终采用了 Diego 提出的解决方案,并希望分享更多关于我的推理和解决方案的细节。

首先,我也赞成选项 1(在业务层处理 "compaction" 消息),原因如下:

  1. 序列化器对于 actor 系统是全局的。使它们有状态实际上是对 Akka 哲学最严重的违反,因为它违背了演员的行为和状态的封装。
  2. 无论如何都必须预先创建序列化程序(即使在添加它们时 "programatically")。
  3. 在设计方面,可以争辩说“消息压缩也不是序列化程序的责任。从严格意义上讲,序列化只是将特定于运行时的数据转换为紧凑的、可交换的表示形式。改变什么不过,序列化并不是序列化程序的任务。

在确定了这一点之后,我仍然努力将 "message compaction" 与 actors 中的实际业务逻辑明确分开。我想出了一种在 Scala 中执行此操作的巧妙方法,我想在这里分享。基本思想是使消息本身看起来像正常情况 class 但仍然允许这些消息本身 "compactify"。这是一个抽象的例子:

class Sender extends ActorRef {
   def context: SharedContext = ... // This is the shared data present on every node.

   // ...

   def someBusinessLogic(receiver: ActorRef) {
     val someData = computeData
     receiver ! MyMessage(someData)
   }
}

class Receiver extends ActorRef {
   implicit def context: SharedContext = ... // This is the shared data present on every node.

   def receiver = {
     case MyMessage(someData) =>
       // ...
   }
}

object Receiver {
  object MyMessage {
    def apply(someData: SomeData) = MyCompactMessage(someData: SomeData)
    def unapply(myCompactMessage: MyCompactMessage)(implicit context: SharedContext)
    : Option[SomeData] =
      Some(myCompactMessage.someData(context))
  }
}

如您所见,发送方和接收方代码感觉就像使用大小写 class,事实上,MyMessage 可能是大小写 class。 但是,通过手动实现 applyunapply,可以插入自己的 "compactification" 逻辑,还可以隐式注入执行 "uncompactification" 所需的共享数据,而无需触及发送者和接收者。对于定义 MyCompactMessage,我发现 Protocol Buffers 特别适合,因为它已经是 Akka 的依赖项并且在 space 和计算方面很高效,但任何其他解决方案都可以。