分布式播放框架应用程序中远程参与者系统之间的交叉通信
Cross communicate between remote actor systems in distributed play framework application
我正在尝试找出构建我的应用程序的最佳方式,以便我可以以冗余方式从我的播放框架应用程序发送推送通知。
我想实现一个 "rest period" 以在用户修改数据 30 秒后向移动设备发送推送通知。如果用户在这 30 秒内进行了另一次修改,则需要取消原始通知并替换为应在最新修改后 30 秒发送的新通知,依此类推。
问题是我的 API 后端需要相互通信以确保它们不会在 30 秒内发送多个通知,因为它们是负载平衡的。例如:
- User1 进行了修改,该修改被发送到 API Server1。 30 秒后触发通知。
- User1 在 5 秒后对同一条记录进行了第二次修改,最终被路由到 API Server2。另一个通知被触发在 30 秒后发送,因为它不知道 Server1 收到的信息。
这是不正确的行为 - 用户 1 应该只收到一个通知,因为修改发生时没有数据 "at rest" 30 秒。
由于我对Akka不是特别熟悉,这似乎是一个很好的学习机会。看来我可以用 Akka 远程处理来解决这个问题。
这是我能想到的最简单的架构:
- 在 API 的每个实例中创建一个 akka 系统 ("notifications"),使用路由器向每个 API 实例发送消息,每个实例都有一个 Akka actor ("notificationActor")
我的 application.conf 看起来像这样:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/router {
router = round-robin-group
routees.paths = [
"akka.tcp://notifications@server1:2552/user/notificationActor",
"akka.tcp://notifications@server2:2552/user/notificationActor"]
}
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "server1" // set to server1 or server 2 upon deployment
port = 2552
}
}
}
我正在这样设置系统、参与者和路由器:
// the system, instantiated once per server
private val system = ActorSystem("notifications")
// the local actor, instantiated once per server
private val notificationActor = system.actorOf(Props[NotificationActor], name = "notificationActor")
// the router, instantiated once per server
private val router = system.actorOf(FromConfig.props(Props[NotificationActor]), name = "router")
当我需要发送通知时,我告诉我的 actor 安排它。这样每个系统都可以在 key/value 对中保留 Cancellable 实例,并在数据在不同服务器上更新时取消通知:
Client.scala(近似值,可能有错别字)
def updateData(data: DbData) = {
// update data in db
NotificationController.sendMessage(Notification(data.key))
}
NotificationController.scala(近似值,可能有错别字)
def sendMessage(pushNotification: Notification) = {
// cancel all notifications related to this data on all servers
router ! Broadcast(CancelNotification(pushNotification))
// schedule the new notification on the next available actor
router ! ScheduleNotification(pushNotification)
}
CancelNotification.scala(近似值,可能有错别字)
case class CancelNotification(pushNotification: Notification)
ScheduleNotification.scala(近似值,可能有错别字)
case class ScheduleNotification(pushNotification: Notification)
NotificationActor.scala(近似值,可能有错别字)
val cancellableMap: Map[String, Cancellable] = // (new concurrent hash map)
def receive: Receive = {
case ScheduleNotification(pushNotification) => //uses - this.context.system.scheduler.scheduleOnce and stores the key/Cancellable pair in cancellableMap
case CancelNotification(pushNotification) => //use stored Cancellable to cancel notification, if present
case Notification(key) => //actually send the push notification
}
这在本地运行良好,但一旦我将它部署到我的测试环境(多台机器),所有其他消息似乎都丢失了。我假设这是因为它试图将这些消息发送到 Server2,但我在这两个应用程序的日志文件中都没有看到任何错误。我试图在我的 akka 配置中添加更多日志记录,但我没有在 logs/application.log(默认播放框架日志)中看到任何额外的输出:
akka {
loglevel = "DEBUG"
log-config-on-start = on
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/router {
router = round-robin-group
routees.paths = [
"akka.tcp://notifications@server1:2552/user/notificationActor",
"akka.tcp://notifications@server2:2552/user/notificationActor"]
}
}
debug {
unhandled = on
}
}
remote {
log-sent-messages = on
log-received-messages = on
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "server1" // set to server1 or server 2 upon deployment
port = 2552
}
}
}
为什么 Server2 收不到消息?我可以在每个实例上使用来自所有服务器的参与者实例化一个参与者系统吗?他们应该能够交叉沟通吗?
此外,如果我将其过于复杂化,我愿意接受其他解决方案。如果我能让它工作的话,这似乎是最简单的方法。
我想我明白了。这个架构似乎可以工作,但我有两个问题,我通过 运行 从我的本地机器解决了它,并将 server1 和 server2 添加到我的配置中。
应用程序启动时我没有实例化我的 actor 系统 - 我使用的是惰性 vals,这意味着 actor 还没有准备好,因为它没有收到会导致它准备好的请求创建。换句话说,Server2 上的 notificationActor 实例没有监听,因为它还没有初始化。
为了解决这个问题,我将所有与 akka 相关的组件(系统、actor 和路由器)的初始化移动到我的 GlobalSettings class 的 onStart 方法中,以便在播放应用程序启动后立即开始.这是主要问题。
我的邮件无法序列化。
我正在尝试找出构建我的应用程序的最佳方式,以便我可以以冗余方式从我的播放框架应用程序发送推送通知。
我想实现一个 "rest period" 以在用户修改数据 30 秒后向移动设备发送推送通知。如果用户在这 30 秒内进行了另一次修改,则需要取消原始通知并替换为应在最新修改后 30 秒发送的新通知,依此类推。
问题是我的 API 后端需要相互通信以确保它们不会在 30 秒内发送多个通知,因为它们是负载平衡的。例如:
- User1 进行了修改,该修改被发送到 API Server1。 30 秒后触发通知。
- User1 在 5 秒后对同一条记录进行了第二次修改,最终被路由到 API Server2。另一个通知被触发在 30 秒后发送,因为它不知道 Server1 收到的信息。
这是不正确的行为 - 用户 1 应该只收到一个通知,因为修改发生时没有数据 "at rest" 30 秒。
由于我对Akka不是特别熟悉,这似乎是一个很好的学习机会。看来我可以用 Akka 远程处理来解决这个问题。
这是我能想到的最简单的架构:
- 在 API 的每个实例中创建一个 akka 系统 ("notifications"),使用路由器向每个 API 实例发送消息,每个实例都有一个 Akka actor ("notificationActor")
我的 application.conf 看起来像这样:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/router {
router = round-robin-group
routees.paths = [
"akka.tcp://notifications@server1:2552/user/notificationActor",
"akka.tcp://notifications@server2:2552/user/notificationActor"]
}
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "server1" // set to server1 or server 2 upon deployment
port = 2552
}
}
}
我正在这样设置系统、参与者和路由器:
// the system, instantiated once per server
private val system = ActorSystem("notifications")
// the local actor, instantiated once per server
private val notificationActor = system.actorOf(Props[NotificationActor], name = "notificationActor")
// the router, instantiated once per server
private val router = system.actorOf(FromConfig.props(Props[NotificationActor]), name = "router")
当我需要发送通知时,我告诉我的 actor 安排它。这样每个系统都可以在 key/value 对中保留 Cancellable 实例,并在数据在不同服务器上更新时取消通知:
Client.scala(近似值,可能有错别字)
def updateData(data: DbData) = {
// update data in db
NotificationController.sendMessage(Notification(data.key))
}
NotificationController.scala(近似值,可能有错别字)
def sendMessage(pushNotification: Notification) = {
// cancel all notifications related to this data on all servers
router ! Broadcast(CancelNotification(pushNotification))
// schedule the new notification on the next available actor
router ! ScheduleNotification(pushNotification)
}
CancelNotification.scala(近似值,可能有错别字)
case class CancelNotification(pushNotification: Notification)
ScheduleNotification.scala(近似值,可能有错别字)
case class ScheduleNotification(pushNotification: Notification)
NotificationActor.scala(近似值,可能有错别字)
val cancellableMap: Map[String, Cancellable] = // (new concurrent hash map)
def receive: Receive = {
case ScheduleNotification(pushNotification) => //uses - this.context.system.scheduler.scheduleOnce and stores the key/Cancellable pair in cancellableMap
case CancelNotification(pushNotification) => //use stored Cancellable to cancel notification, if present
case Notification(key) => //actually send the push notification
}
这在本地运行良好,但一旦我将它部署到我的测试环境(多台机器),所有其他消息似乎都丢失了。我假设这是因为它试图将这些消息发送到 Server2,但我在这两个应用程序的日志文件中都没有看到任何错误。我试图在我的 akka 配置中添加更多日志记录,但我没有在 logs/application.log(默认播放框架日志)中看到任何额外的输出:
akka {
loglevel = "DEBUG"
log-config-on-start = on
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/router {
router = round-robin-group
routees.paths = [
"akka.tcp://notifications@server1:2552/user/notificationActor",
"akka.tcp://notifications@server2:2552/user/notificationActor"]
}
}
debug {
unhandled = on
}
}
remote {
log-sent-messages = on
log-received-messages = on
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "server1" // set to server1 or server 2 upon deployment
port = 2552
}
}
}
为什么 Server2 收不到消息?我可以在每个实例上使用来自所有服务器的参与者实例化一个参与者系统吗?他们应该能够交叉沟通吗?
此外,如果我将其过于复杂化,我愿意接受其他解决方案。如果我能让它工作的话,这似乎是最简单的方法。
我想我明白了。这个架构似乎可以工作,但我有两个问题,我通过 运行 从我的本地机器解决了它,并将 server1 和 server2 添加到我的配置中。
应用程序启动时我没有实例化我的 actor 系统 - 我使用的是惰性 vals,这意味着 actor 还没有准备好,因为它没有收到会导致它准备好的请求创建。换句话说,Server2 上的 notificationActor 实例没有监听,因为它还没有初始化。
为了解决这个问题,我将所有与 akka 相关的组件(系统、actor 和路由器)的初始化移动到我的 GlobalSettings class 的 onStart 方法中,以便在播放应用程序启动后立即开始.这是主要问题。
我的邮件无法序列化。