akka persist函数不是每次都处理
akka persist function is not handled every time
我在集群中使用带有分片的 akka 的 PersistentActor 来跟踪我的状态。我有一个 'Room',我可以通过以下代码更新它:
case UpdateRoom(id, room, userId) => ((ret: ActorRef) => {
(userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map { role =>
if (role == "owner") {
state = Some(room)
ret ! Success(room)
eventRegion ! RoomEventPackage(id, RoomUpdated(room))
println("works")
persist(RoomUpdated(room))(e => println("this doesn't always fire")
} else {
ret ! Failure(InsufficientRights(role, "Update Room"))
}
}
问题是 persist 仅每隔一段时间工作一次,而该功能的其余部分按预期工作。 ("works" 每次都打印一次,"this doesn't always fire" 每隔一段时间打印一次,然后打印两次)。
我总是必须两次触发更新命令来存储我的事件,但它似乎在我触发命令的两次都被存储了。
我是否遗漏了 akka 坚持的重要部分?
我认为您在 Actor
的世界中犯了一个严重的错误:从外部访问 actor(可变)状态。在您的情况下,这在 ask
/?
:
返回的 Future
的回调中发生了两次
- 更新状态时:
state = Some(room)
- 调用时
persist
处理从 Actor
内部请求并随后修改 actor 状态的唯一安全方法是从 ask 的回调中向同一个 actor 发送消息,为此,您可以使用 pipeTo
.
使用您的代码的简化版本来说明:
case UpdateRoom(id, room, userId) =>
val answer = (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map(role => RoleForRoom(id, room, userId, role))
answer piepTo self
case RoleForRoom(id, room, userId, room) =>
if (role == "owner") {
state = Some(room)
eventRegion ! RoomEventPackage(id, RoomUpdated(room))
persist(RoomUpdated(room))(e => println("this is safe"))
}
另请参阅:https://doc.akka.io/docs/akka/2.5.6/scala/general/jmm.html#actors-and-shared-mutable-state
我在集群中使用带有分片的 akka 的 PersistentActor 来跟踪我的状态。我有一个 'Room',我可以通过以下代码更新它:
case UpdateRoom(id, room, userId) => ((ret: ActorRef) => {
(userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map { role =>
if (role == "owner") {
state = Some(room)
ret ! Success(room)
eventRegion ! RoomEventPackage(id, RoomUpdated(room))
println("works")
persist(RoomUpdated(room))(e => println("this doesn't always fire")
} else {
ret ! Failure(InsufficientRights(role, "Update Room"))
}
}
问题是 persist 仅每隔一段时间工作一次,而该功能的其余部分按预期工作。 ("works" 每次都打印一次,"this doesn't always fire" 每隔一段时间打印一次,然后打印两次)。 我总是必须两次触发更新命令来存储我的事件,但它似乎在我触发命令的两次都被存储了。
我是否遗漏了 akka 坚持的重要部分?
我认为您在 Actor
的世界中犯了一个严重的错误:从外部访问 actor(可变)状态。在您的情况下,这在 ask
/?
:
Future
的回调中发生了两次
- 更新状态时:
state = Some(room)
- 调用时
persist
处理从 Actor
内部请求并随后修改 actor 状态的唯一安全方法是从 ask 的回调中向同一个 actor 发送消息,为此,您可以使用 pipeTo
.
使用您的代码的简化版本来说明:
case UpdateRoom(id, room, userId) =>
val answer = (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map(role => RoleForRoom(id, room, userId, role))
answer piepTo self
case RoleForRoom(id, room, userId, room) =>
if (role == "owner") {
state = Some(room)
eventRegion ! RoomEventPackage(id, RoomUpdated(room))
persist(RoomUpdated(room))(e => println("this is safe"))
}
另请参阅:https://doc.akka.io/docs/akka/2.5.6/scala/general/jmm.html#actors-and-shared-mutable-state