Akka:某个 class 的演员可以成为另一个 class 的演员吗?
Akka: Can an actor of some class become an actor of a diferent class?
作为课程项目,我正在尝试实现(模拟)Raft 协议。
在这个 post 中,我根本不会使用 Raft 术语;相反,我将使用一个简化的。
该协议由多个服务器(例如 5 个)运行 执行,这些服务器可以处于三种不同的状态(A、B、C)。
服务器从“基本”种类继承了一些状态变量和行为,但它们也都有许多独特的状态变量和方法,并响应不同的消息。
在协议的某个点,处于某种状态(例如 A)的服务器需要变为另一种状态(例如 B)。
也就是说,服务器应该:
- 丢失状态 A 的状态变量和方法,获取状态 B 的状态变量和方法,但保留“基本”种类的变量。
- 停止响应发往状态 A 的消息,开始响应发往状态 B 的消息。
在 Akka 中,第 1 点可以使用 Receives 和 become() 来实现。
第 2 点是必需的,因为例如,class B 的参与者不应该访问 class A 的参与者的状态变量和方法。这旨在分离关注点,并且实现更好的代码组织。
我在实施第 2 点时面临的问题如下:
- 现在,我的实现只有一个参与者,其中包含 A 和 B 状态变量和方法。
- 我试图实现的协议要求每个服务器都必须保留对其他服务器的引用(即其他服务器的 ActorRef)。
- 我不能简单地在状态 B 中生成一个 actor,将“基本”种类的状态变量的值传递给它,然后停止旧的 actor,因为新生成的 actor 有一个新的 ActorRef,并且其他服务器对此一无所知,它们将继续使用旧的 ActorRef 发送消息(因此,新的 actor 不会收到任何东西,并且双方都超时)。
一种规避该问题的方法是,新生成的 actor 通过向其他 actor(包括其旧的 ActorRef)发送消息来“宣传”自己。
然而,同样由于协议的原因,其他服务器可能暂时不可用(即它们已崩溃),因此它们可能无法接收和处理广告。
在项目中,我必须使用AbstractActor的扩展,而不是FSM(最终状态机),并且必须使用Java。
是否有任何 Akka 模式或功能可以解决此用例?感谢您的任何见解。下面是一个简化的例子。
public abstract class BaseActor extends AbstractActor {
protected int x = 0;
// some state variables and methods that make sense for both A and B
@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("x", msg -> {
System.out.println(x);
x++;
})
.build();
}
}
public class A extends BaseActor {
protected int a = 10;
// many other state variables and methods that are own of A and do NOT make sense to B
@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("a", msg -> {
System.out.println(a);
})
.matchEquals("change", msg -> {
// here I want A to become B, but maintain value of x
})
.build()
.orElse(super.createReceive());
}
}
public class B extends BaseActor {
protected int b = 20;
// many other state variables and methods that are own of B and do NOT make sense to A
@Override
public AbstractActor.Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("b", msg -> {
System.out.println(b);
})
.matchEquals("change", msg -> {
// here I want B to become A, but maintain value of x
})
.build()
.orElse(super.createReceive());
}
}
public class Example {
public static void main(String[] args) {
var system = ActorSystem.create("example");
// actor has class A
var actor = system.actorOf(Props.create(A.class));
actor.tell("x", ActorRef.noSender()); // prints "0"
actor.tell("a", ActorRef.noSender()); // prints "10"
// here, the actor should become of class B,
// preserving the value of x, a variable of the "base" kind
actor.tell("change", ActorRef.noSender());
// actor has class B
actor.tell("x", ActorRef.noSender()); // should print "1"
actor.tell("b", ActorRef.noSender()); // should print "20"
}
}
诚然,我的 Java 已经生锈了,但是例如,这个 actor(或非常类似的东西...)将接受字符串,直到它收到 Lock
消息,然后它可以查询它在被锁定之前收到了多少个不同的字符串。因此,在它获取的第一个 Receive
中,它跟踪接收到的字符串的 Set
以进行重复数据删除。在 Lock
上,它过渡到第二个 Receive
,它不包含 Set
(只是一个 Integer
字段)并忽略 String
和 Lock
消息。
import akka.japi.JavaPartialFunction;
import java.util.HashSet;
import scala.runtime.BoxedUnit;
public class StringCounter extends AbstractActor {
public StringCounter() {}
public static class Lock {
private Lock() {}
public static final Lock INSTANCE = new Lock();
}
public static class Query {
private Query() {}
public static final Query INSTANCE = new Query();
}
/** The taking in Strings state */
public class AcceptingStrings extends JavaPartialFunction<Object, BoxedUnit> {
private HashSet<String> strings;
public AcceptingStrings() {
strings = new HashSet<String>();
}
public BoxedUnit apply(Object msg, boolean isCheck) {
if (msg instanceof String) {
if (!isCheck) {
strings.add(msg);
}
} else if (msg instanceof Lock) {
if (!isCheck) {
context().become(new Queryable(strings.size()), true);
}
} else {
// not handling any other message
throw noMatch();
}
return BoxedUnit.UNIT;
}
}
/** The responding to queries state */
public class Queryable extends JavaPartialFunction<Object, BoxedUnit> {
private Integer ans;
public Queryable(int answer) {
ans = Integer.valueOf(answer);
}
public BoxedUnit apply(Object msg, boolean isCheck) {
if (msg instanceof Query) {
if (!isCheck) {
getSender().tell(ans, getSelf());
}
} else {
// not handling any other message
throw noMatch();
}
return BoxedUnit.UNIT;
}
}
@Override
public Receive createReceive() {
return new Receive(new AcceptingStrings());
}
}
请注意,在 Queryable
中,该集合早已不复存在。需要注意的一件事是 JavaPartialFunction
通常会调用 apply
一次 isCheck
设置为 true 并且如果该调用不抛出异常 return noMatch()
,将 isCheck
设置为 false 再次“真实地”调用它。因此,在 isCheck
为真的情况下,您需要注意除了 throw noMatch()
或 return 之外不要做任何事情。
这种模式在幕后与 Akka Typed(尤其是函数 API)中发生的事情非常相似。
希望这能阐明这种方法。当然,您的导师有可能不会接受这一点,但在那种情况下,可能值得反驳的论点是:
- 在 actor 模型中,状态和行为实际上是同一件事
- 的所有功能都包含在
AbstractActor
中
我也不一定建议在 Java Akka 代码中通常使用这种方法(AbstractActor
在其字段中有状态感觉更多 Java-y)。
这是它的外观的草图实现。
- 您为每个州建模一个单独的 class:
public class BaseState {
//base state fields/getters/setters
}
public class StateA {
BaseState baseState;
//state A fields/getters/setters
..
//factory methods
public static StateA fromBase(BaseState baseState) {...}
//if you need to go from StateB to StateA:
public static StateA fromStateB(StateB stateB) {...}
}
public class StateB {
BaseState baseState;
//state B fields/getters/setters
//factory methods
public static StateB fromBase(BaseState baseState) {...}
//if you need to go from StateA to StateB:
public static StateB fromStateA(StateA stateA) {...}
}
- 然后在您的 Actor 中,您可以接收为 A 和 B 定义的函数,并将其初始化为 A 或 B,具体取决于哪一个是初始的
private static class MyActor extends AbstractActor
{
private AbstractActor.Receive receive4StateA(StateA stateA)
{
return new ReceiveBuilder()
.matchEquals("a", msg -> stateA.setSomeProperty(msg))
.matchEquals("changeToB", msg -> getContext().become(
receive4StateB(StateB.fromStateA(stateA))))
.build();
}
private AbstractActor.Receive receive4StateB(StateB stateB)
{
return new ReceiveBuilder()
.matchEquals("b", msg -> stateB.setSomeProperty(msg))
.matchEquals("changeToA", msg -> getContext().become(
receive4StateA(StateA.fromStateB(stateB))))
.build();
}
//assuming stateA is the initial state
@Override
public AbstractActor.Receive createReceive()
{
return receive4StateA(StateA.fromBase(new BaseState()));
}
}
作为课程项目,我正在尝试实现(模拟)Raft 协议。 在这个 post 中,我根本不会使用 Raft 术语;相反,我将使用一个简化的。
该协议由多个服务器(例如 5 个)运行 执行,这些服务器可以处于三种不同的状态(A、B、C)。 服务器从“基本”种类继承了一些状态变量和行为,但它们也都有许多独特的状态变量和方法,并响应不同的消息。 在协议的某个点,处于某种状态(例如 A)的服务器需要变为另一种状态(例如 B)。 也就是说,服务器应该:
- 丢失状态 A 的状态变量和方法,获取状态 B 的状态变量和方法,但保留“基本”种类的变量。
- 停止响应发往状态 A 的消息,开始响应发往状态 B 的消息。
在 Akka 中,第 1 点可以使用 Receives 和 become() 来实现。
第 2 点是必需的,因为例如,class B 的参与者不应该访问 class A 的参与者的状态变量和方法。这旨在分离关注点,并且实现更好的代码组织。
我在实施第 2 点时面临的问题如下:
- 现在,我的实现只有一个参与者,其中包含 A 和 B 状态变量和方法。
- 我试图实现的协议要求每个服务器都必须保留对其他服务器的引用(即其他服务器的 ActorRef)。
- 我不能简单地在状态 B 中生成一个 actor,将“基本”种类的状态变量的值传递给它,然后停止旧的 actor,因为新生成的 actor 有一个新的 ActorRef,并且其他服务器对此一无所知,它们将继续使用旧的 ActorRef 发送消息(因此,新的 actor 不会收到任何东西,并且双方都超时)。
一种规避该问题的方法是,新生成的 actor 通过向其他 actor(包括其旧的 ActorRef)发送消息来“宣传”自己。 然而,同样由于协议的原因,其他服务器可能暂时不可用(即它们已崩溃),因此它们可能无法接收和处理广告。
在项目中,我必须使用AbstractActor的扩展,而不是FSM(最终状态机),并且必须使用Java。
是否有任何 Akka 模式或功能可以解决此用例?感谢您的任何见解。下面是一个简化的例子。
public abstract class BaseActor extends AbstractActor {
protected int x = 0;
// some state variables and methods that make sense for both A and B
@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("x", msg -> {
System.out.println(x);
x++;
})
.build();
}
}
public class A extends BaseActor {
protected int a = 10;
// many other state variables and methods that are own of A and do NOT make sense to B
@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("a", msg -> {
System.out.println(a);
})
.matchEquals("change", msg -> {
// here I want A to become B, but maintain value of x
})
.build()
.orElse(super.createReceive());
}
}
public class B extends BaseActor {
protected int b = 20;
// many other state variables and methods that are own of B and do NOT make sense to A
@Override
public AbstractActor.Receive createReceive() {
return new ReceiveBuilder()
.matchEquals("b", msg -> {
System.out.println(b);
})
.matchEquals("change", msg -> {
// here I want B to become A, but maintain value of x
})
.build()
.orElse(super.createReceive());
}
}
public class Example {
public static void main(String[] args) {
var system = ActorSystem.create("example");
// actor has class A
var actor = system.actorOf(Props.create(A.class));
actor.tell("x", ActorRef.noSender()); // prints "0"
actor.tell("a", ActorRef.noSender()); // prints "10"
// here, the actor should become of class B,
// preserving the value of x, a variable of the "base" kind
actor.tell("change", ActorRef.noSender());
// actor has class B
actor.tell("x", ActorRef.noSender()); // should print "1"
actor.tell("b", ActorRef.noSender()); // should print "20"
}
}
诚然,我的 Java 已经生锈了,但是例如,这个 actor(或非常类似的东西...)将接受字符串,直到它收到 Lock
消息,然后它可以查询它在被锁定之前收到了多少个不同的字符串。因此,在它获取的第一个 Receive
中,它跟踪接收到的字符串的 Set
以进行重复数据删除。在 Lock
上,它过渡到第二个 Receive
,它不包含 Set
(只是一个 Integer
字段)并忽略 String
和 Lock
消息。
import akka.japi.JavaPartialFunction;
import java.util.HashSet;
import scala.runtime.BoxedUnit;
public class StringCounter extends AbstractActor {
public StringCounter() {}
public static class Lock {
private Lock() {}
public static final Lock INSTANCE = new Lock();
}
public static class Query {
private Query() {}
public static final Query INSTANCE = new Query();
}
/** The taking in Strings state */
public class AcceptingStrings extends JavaPartialFunction<Object, BoxedUnit> {
private HashSet<String> strings;
public AcceptingStrings() {
strings = new HashSet<String>();
}
public BoxedUnit apply(Object msg, boolean isCheck) {
if (msg instanceof String) {
if (!isCheck) {
strings.add(msg);
}
} else if (msg instanceof Lock) {
if (!isCheck) {
context().become(new Queryable(strings.size()), true);
}
} else {
// not handling any other message
throw noMatch();
}
return BoxedUnit.UNIT;
}
}
/** The responding to queries state */
public class Queryable extends JavaPartialFunction<Object, BoxedUnit> {
private Integer ans;
public Queryable(int answer) {
ans = Integer.valueOf(answer);
}
public BoxedUnit apply(Object msg, boolean isCheck) {
if (msg instanceof Query) {
if (!isCheck) {
getSender().tell(ans, getSelf());
}
} else {
// not handling any other message
throw noMatch();
}
return BoxedUnit.UNIT;
}
}
@Override
public Receive createReceive() {
return new Receive(new AcceptingStrings());
}
}
请注意,在 Queryable
中,该集合早已不复存在。需要注意的一件事是 JavaPartialFunction
通常会调用 apply
一次 isCheck
设置为 true 并且如果该调用不抛出异常 return noMatch()
,将 isCheck
设置为 false 再次“真实地”调用它。因此,在 isCheck
为真的情况下,您需要注意除了 throw noMatch()
或 return 之外不要做任何事情。
这种模式在幕后与 Akka Typed(尤其是函数 API)中发生的事情非常相似。
希望这能阐明这种方法。当然,您的导师有可能不会接受这一点,但在那种情况下,可能值得反驳的论点是:
- 在 actor 模型中,状态和行为实际上是同一件事
- 的所有功能都包含在
AbstractActor
中
我也不一定建议在 Java Akka 代码中通常使用这种方法(AbstractActor
在其字段中有状态感觉更多 Java-y)。
这是它的外观的草图实现。
- 您为每个州建模一个单独的 class:
public class BaseState {
//base state fields/getters/setters
}
public class StateA {
BaseState baseState;
//state A fields/getters/setters
..
//factory methods
public static StateA fromBase(BaseState baseState) {...}
//if you need to go from StateB to StateA:
public static StateA fromStateB(StateB stateB) {...}
}
public class StateB {
BaseState baseState;
//state B fields/getters/setters
//factory methods
public static StateB fromBase(BaseState baseState) {...}
//if you need to go from StateA to StateB:
public static StateB fromStateA(StateA stateA) {...}
}
- 然后在您的 Actor 中,您可以接收为 A 和 B 定义的函数,并将其初始化为 A 或 B,具体取决于哪一个是初始的
private static class MyActor extends AbstractActor
{
private AbstractActor.Receive receive4StateA(StateA stateA)
{
return new ReceiveBuilder()
.matchEquals("a", msg -> stateA.setSomeProperty(msg))
.matchEquals("changeToB", msg -> getContext().become(
receive4StateB(StateB.fromStateA(stateA))))
.build();
}
private AbstractActor.Receive receive4StateB(StateB stateB)
{
return new ReceiveBuilder()
.matchEquals("b", msg -> stateB.setSomeProperty(msg))
.matchEquals("changeToA", msg -> getContext().become(
receive4StateA(StateA.fromStateB(stateB))))
.build();
}
//assuming stateA is the initial state
@Override
public AbstractActor.Receive createReceive()
{
return receive4StateA(StateA.fromBase(new BaseState()));
}
}