Akka:某个 class 的演员可以成为另一个 class 的演员吗?

Akka: Can an actor of some class become an actor of a diferent class?

作为课程项目,我正在尝试实现(模拟)Raft 协议。 在这个 post 中,我根本不会使用 Raft 术语;相反,我将使用一个简化的。

该协议由多个服务器(例如 5 个)运行 执行,这些服务器可以处于三种不同的状态(A、B、C)。 服务器从“基本”种类继承了一些状态变量和行为,但它们也都有许多独特的状态变量和方法,并响应不同的消息。 在协议的某个点,处于某种状态(例如 A)的服务器需要变为另一种状态(例如 B)。 也就是说,服务器应该:

  1. 丢失状态 A 的状态变量和方法,获取状态 B 的状态变量和方法,但保留“基本”种类的变量。
  2. 停止响应发往状态 A 的消息,开始响应发往状态 B 的消息。

在 Akka 中,第 1 点可以使用 Receives 和 become() 来实现。

第 2 点是必需的,因为例如,class B 的参与者不应该访问 class A 的参与者的状态变量和方法。这旨在分离关注点,并且实现更好的代码组织。

我在实施第 2 点时面临的问题如下:

一种规避该问题的方法是,新生成的 actor 通过向其他 actor(包括其旧的 ActorRef)发送消息来“宣传”自己。 然而,同样由于协议的原因,其他服务器可能暂时不可用(即它们已崩溃),因此它们可能无法接收和处理广告。

在项目中,我必须使用AbstractActor的扩展,而不是FSM(最终状态机),并且必须使用Java。

是否有任何 Akka 模式或功能可以解决此用例?感谢您的任何见解。下面是一个简化的例子。

public abstract class BaseActor extends AbstractActor {
    protected int x = 0;
    // some state variables and methods that make sense for both A and B

    @Override
    public Receive createReceive() {
        return new ReceiveBuilder()
                .matchEquals("x", msg -> {
                    System.out.println(x);
                    x++;
                })
                .build();
    }
}

public class A extends BaseActor {
    protected int a = 10;
    // many other state variables and methods that are own of A and do NOT make sense to B

    @Override
    public Receive createReceive() {
        return new ReceiveBuilder()
                .matchEquals("a", msg -> {
                    System.out.println(a);
                })
                .matchEquals("change", msg -> {
                    // here I want A to become B, but maintain value of x
                })
                .build()
                .orElse(super.createReceive());
    }
}

public class B extends BaseActor {
    protected int b = 20;
    // many other state variables and methods that are own of B and do NOT make sense to A

    @Override
    public AbstractActor.Receive createReceive() {
        return new ReceiveBuilder()
                .matchEquals("b", msg -> {
                    System.out.println(b);
                })
                .matchEquals("change", msg -> {
                    // here I want B to become A, but maintain value of x
                })
                .build()
                .orElse(super.createReceive());
    }
}

public class Example {
    public static void main(String[] args) {
        var system = ActorSystem.create("example");

        // actor has class A
        var actor = system.actorOf(Props.create(A.class));
        actor.tell("x", ActorRef.noSender()); // prints "0"
        actor.tell("a", ActorRef.noSender()); // prints "10"

        // here, the actor should become of class B,
        // preserving the value of x, a variable of the "base" kind
        actor.tell("change", ActorRef.noSender());

        // actor has class B
        actor.tell("x", ActorRef.noSender()); // should print "1"
        actor.tell("b", ActorRef.noSender()); // should print "20"
    }
}

诚然,我的 Java 已经生锈了,但是例如,这个 actor(或非常类似的东西...)将接受字符串,直到它收到 Lock 消息,然后它可以查询它在被锁定之前收到了多少个不同的字符串。因此,在它获取的第一个 Receive 中,它跟踪接收到的字符串的 Set 以进行重复数据删除。在 Lock 上,它过渡到第二个 Receive,它不包含 Set(只是一个 Integer 字段)并忽略 StringLock消息。

import akka.japi.JavaPartialFunction;
import java.util.HashSet;
import scala.runtime.BoxedUnit;

public class StringCounter extends AbstractActor {
  public StringCounter() {}

  public static class Lock {
    private Lock() {}
    public static final Lock INSTANCE = new Lock();
  }

  public static class Query {
    private Query() {}
    public static final Query INSTANCE = new Query();
  }

  /** The taking in Strings state */
  public class AcceptingStrings extends JavaPartialFunction<Object, BoxedUnit> {
    private HashSet<String> strings;

    public AcceptingStrings() {
      strings = new HashSet<String>();
    }

    public BoxedUnit apply(Object msg, boolean isCheck) {
      if (msg instanceof String) {
        if (!isCheck) {
          strings.add(msg);
        }
      } else if (msg instanceof Lock) {
        if (!isCheck) {
          context().become(new Queryable(strings.size()), true);
        }
      } else {
        // not handling any other message
        throw noMatch();
      }

      return BoxedUnit.UNIT;
    }
  }

  /** The responding to queries state */
  public class Queryable extends JavaPartialFunction<Object, BoxedUnit> {
    private Integer ans;

    public Queryable(int answer) {
      ans = Integer.valueOf(answer);
    }

    public BoxedUnit apply(Object msg, boolean isCheck) {
      if (msg instanceof Query) {
        if (!isCheck) {
          getSender().tell(ans, getSelf());
        }
      } else {
        // not handling any other message
        throw noMatch();
      }

      return BoxedUnit.UNIT;
    }
  }

  @Override
  public Receive createReceive() {
    return new Receive(new AcceptingStrings());
  }
}

请注意,在 Queryable 中,该集合早已不复存在。需要注意的一件事是 JavaPartialFunction 通常会调用 apply 一次 isCheck 设置为 true 并且如果该调用不抛出异常 return noMatch(),将 isCheck 设置为 false 再次“真实地”调用它。因此,在 isCheck 为真的情况下,您需要注意除了 throw noMatch() 或 return 之外不要做任何事情。

这种模式在幕后与 Akka Typed(尤其是函数 API)中发生的事情非常相似。

希望这能阐明这种方法。当然,您的导师有可能不会接受这一点,但在那种情况下,可能值得反驳的论点是:

  • 在 actor 模型中,状态和行为实际上是同一件事
  • 的所有功能都包含在AbstractActor

我也不一定建议在 Java Akka 代码中通常使用这种方法(AbstractActor 在其字段中有状态感觉更多 Java-y)。

这是它的外观的草图实现。

  1. 您为每个州建模​​一个单独的 class:
public class BaseState {
  //base state fields/getters/setters
}
public class StateA {
  BaseState baseState;
  //state A fields/getters/setters
  ..

  //factory methods
  public static StateA fromBase(BaseState baseState) {...}

  //if you need to go from StateB to StateA:
  public static StateA fromStateB(StateB stateB) {...}
}
public class StateB {
  BaseState baseState;
  //state B fields/getters/setters

  //factory methods
  public static StateB fromBase(BaseState baseState) {...}

  //if you need to go from StateA to StateB:
  public static StateB fromStateA(StateA stateA) {...}
}
  1. 然后在您的 Actor 中,您可以接收为 A 和 B 定义的函数,并将其初始化为 A 或 B,具体取决于哪一个是初始的


private static class MyActor extends AbstractActor
  {
    private AbstractActor.Receive receive4StateA(StateA stateA)
    {
      return new ReceiveBuilder()
        .matchEquals("a", msg -> stateA.setSomeProperty(msg))
        .matchEquals("changeToB", msg -> getContext().become(
          receive4StateB(StateB.fromStateA(stateA))))
        .build();
    }

    private AbstractActor.Receive receive4StateB(StateB stateB)
    {
      return new ReceiveBuilder()
        .matchEquals("b", msg -> stateB.setSomeProperty(msg))
        .matchEquals("changeToA", msg -> getContext().become(
          receive4StateA(StateA.fromStateB(stateB))))
        .build();
    }

    //assuming stateA is the initial state
    @Override
    public AbstractActor.Receive createReceive()
    {
      return receive4StateA(StateA.fromBase(new BaseState()));
    }
  }