演员模型:一个演员需要另一个演员的信息

actor model: one actor requires information from another actor

关于演员模型,我有一点不明白。 假设你有两个演员。他们从各种来源收集和管理数据。这些消息来源通过 inbox/postbox/queue 与演员互动。例如,参与者 A 收集信号,而参与者 B 管理整个系统的信息。

在某些时候,参与者 A 必须处理其数据。在处理过程中,它无能为力。例如,无法处理新信号。作为数据处理的一部分,参与者 A 在 A 继续之前需要参与者 B 的信息。

在伪代码中:

functionOfActorA()
{
  internalQueue {
   ...doing stuff with our data
   info = actor_B.getInfo() -> what should happen here?
   ...doing stuf with our data and the obtained info
   }
}

getInfo()//function of actor B
{
  internalQueue {
    ...prepare requested info
    ... -> what should happen here?
  }
} 

如果两个 actor 应该在他们自己的队列或线程上独立操作,那么您如何根据 actor 的请求从一个 actor 获取信息到另一个 actor?

Actor A 可以通过向 Actor B 发送请求来请求信息,但响应将在消息中返回,Actor A 将在未来的某个时间收到该消息。有两种保存此信息的方法:

  1. Actor A 可以在内部存储信息以等待该响应。

  2. Actor A 可以将信息附加到它发送给 Actor B 的消息中,而 Actor B returns 该上下文(否则它会忽略)。

当 Actor A 从 Actor B 获取信息后,它可以继续处理。

这是第一种实施方式的示例,使用 Thespian Python 演员库 (http://thespianpy.com):

class ActorA(Actor):

    def __init__(self, *args, **kw):
        super(ActorA, self).__init__(args, kw)
        self.datalist = []

    def receiveMessage(self, msg, sender):

        if isinstance(msg, ActorAddress):
            self.actorB = msg

        elif isinstance(msg, WorkRequest):
            x = got_some_data(msg)
            self.datalist.append(x)
            self.send(self.actorB, NeedInfo())

        elif isinstance(msg, Info):
            x = self.datalist.pop()
            process_data(x, msg)

class ActorB(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, NeedInfo):
            i = get_info(msg)
            self.send(sender, i)

以上显示了将工作内部存储到 actor 以便稍后继续的基本功能。有几个注意事项:

  • 如果要存储多个数据项,ActorA 需要有一些方法来确定来自 ActorB 的信息适用于哪个项目。

  • ActorA 需要处理它还不知道 ActorB 地址的情况。

  • ActorA 可能应该使用某种超时来避免将工作永远保留在其内部列表中(如果 ActorB 永远不会响应)。

  • 如果 ActorA 退出或死亡,它应该在退出前对仍在数据列表中的项目做一些适当的事情。

第二种实现方式对应的简单示例如下:

class ActorA(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, ActorAddress):
            self.actorB = msg

        elif isinstance(msg, WorkRequest):
            x = got_some_data(msg)
            self.send(self.actorB, NeedInfo(x))

        elif isinstance(msg, Info):
            x = msg.orig_request.data
            process_data(x, msg)

class ActorB(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, NeedInfo):
            i = getinfo(msg)
            i.orig_request = msg
            self.send(sender, i)

在第二个示例中,创建 NeedInfo 的调用将数据存储在 NeedInfo 对象的 .data 字段中,ActorB 传回的信息附加了原始 NeedInfo 消息。这种风格的注意事项是:

  • 无需将原始WorkRequest消息独立关联到相应的Info,因为它们是相互依存的。

  • ActorA 更依赖于 ActorB 的实现以期望 ActorB 将原始消息附加到其响应中以允许 ActorA 恢复此上下文。

  • ActorA 还需要处理它还不知道 ActorB 地址的情况。

  • 如果 ActorA 退出或死亡,则它没有此未完成工作的记录以在清理之前采取任何操作(尽管死信处理程序可以执行此角色)。

以上示例相当简单,其他 Actor 模型库实现会有一些变化,但这些通用技术应该广泛适用,而不管 library/language。这两种实现方式都遵循以下通用 Actor 指南:

  1. 收到消息后,尽力而为

  2. 根据需要更新内部状态以处理下一条消息

  3. 退出消息处理程序

虽然可以编写执行阻塞操作的 Actor,但该操作会干扰 Actor 的响应能力和处理其他消息的能力(正如您正确指出的那样),因此尽可能使用消息驱动的延续阻塞呼叫。