告诉发件人演员在多次重试后失败的最佳方法是什么
What is the best approach to tell a sender that an actor failed after a number of retries
我有父 -> 子角色关系,可以将文件上传到 Dropbox。该关系由一个主管角色和一个上传角色组成。 supervisor actor为上传actor定义了一个supervisor strategy。因此,如果上传到 Dropbox 失败,只要达到最大重试次数,actor 就应该重新启动。在我的应用程序中,我对上传状态感兴趣。所以我使用 ask 模式来接收未来的成功或失败案例。您可以在下面找到我的演员的当前实施情况。
/**
* An upload message.
*
* @param byte The byte array representing the content of a file.
* @param path The path under which the file should be stored.
*/
case class UploadMsg(byte: Array[Byte], path: String)
/**
* The upload supervisor.
*/
class UploadSupervisor extends Actor {
/**
* Stores the sender to the executing actor.
*/
val senders: ParHashMap[String, ActorRef] = ParHashMap()
/**
* Defines the supervisor strategy
*/
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: DbxException => Restart
case e: Exception => Stop
}
/**
* Handles the received messages.
*/
override def receive: Actor.Receive = {
case msg: UploadMsg =>
implicit val timeout = Timeout(60.seconds)
val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
senders += actor.path.toString -> sender
context.watch(actor)
ask(actor, msg).mapTo[Unit] pipeTo sender
case Terminated(a) =>
context.unwatch(a)
senders.get(a.path.toString).map { sender =>
sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
senders - a.path.toString
}
}
}
/**
* An aktor which uploads a file to Dropbox.
*/
class UploadActor @Inject() (client: DropboxClient) extends Actor {
/**
* Sends the message again after restart.
*
* @param reason The reason why an restart occurred.
* @param message The message which causes the restart.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
message foreach { self forward }
}
/**
* Handles the received messages.
*/
override def receive: Receive = {
case UploadMsg(byte, path) =>
val encrypted = encryptor.encrypt(byte)
val is = new ByteArrayInputStream(encrypted)
try {
client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
sender ! (())
} finally {
is.close()
}
}
}
我现在的问题是:是否有更好的解决方案来告诉我的应用程序上传 actor 在指定次数或重试后失败。尤其是给演员存储发件人的map感觉有点别扭
你应该使用CircuitBreaker
val breaker =
new CircuitBreaker(context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute)
然后通过 breaker 给你包装消息:
sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
Breaker有三种状态:Closed、Open和HalfOpen。正常状态为 Closed,当消息失败 $maxFailures 次后状态变为 Open。
Breaker 为状态变化提供回调。如果您想做某事,请使用它。例如:
breaker onOpen { sender ! FailureMessage()}
我有父 -> 子角色关系,可以将文件上传到 Dropbox。该关系由一个主管角色和一个上传角色组成。 supervisor actor为上传actor定义了一个supervisor strategy。因此,如果上传到 Dropbox 失败,只要达到最大重试次数,actor 就应该重新启动。在我的应用程序中,我对上传状态感兴趣。所以我使用 ask 模式来接收未来的成功或失败案例。您可以在下面找到我的演员的当前实施情况。
/**
* An upload message.
*
* @param byte The byte array representing the content of a file.
* @param path The path under which the file should be stored.
*/
case class UploadMsg(byte: Array[Byte], path: String)
/**
* The upload supervisor.
*/
class UploadSupervisor extends Actor {
/**
* Stores the sender to the executing actor.
*/
val senders: ParHashMap[String, ActorRef] = ParHashMap()
/**
* Defines the supervisor strategy
*/
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: DbxException => Restart
case e: Exception => Stop
}
/**
* Handles the received messages.
*/
override def receive: Actor.Receive = {
case msg: UploadMsg =>
implicit val timeout = Timeout(60.seconds)
val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
senders += actor.path.toString -> sender
context.watch(actor)
ask(actor, msg).mapTo[Unit] pipeTo sender
case Terminated(a) =>
context.unwatch(a)
senders.get(a.path.toString).map { sender =>
sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
senders - a.path.toString
}
}
}
/**
* An aktor which uploads a file to Dropbox.
*/
class UploadActor @Inject() (client: DropboxClient) extends Actor {
/**
* Sends the message again after restart.
*
* @param reason The reason why an restart occurred.
* @param message The message which causes the restart.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
message foreach { self forward }
}
/**
* Handles the received messages.
*/
override def receive: Receive = {
case UploadMsg(byte, path) =>
val encrypted = encryptor.encrypt(byte)
val is = new ByteArrayInputStream(encrypted)
try {
client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
sender ! (())
} finally {
is.close()
}
}
}
我现在的问题是:是否有更好的解决方案来告诉我的应用程序上传 actor 在指定次数或重试后失败。尤其是给演员存储发件人的map感觉有点别扭
你应该使用CircuitBreaker
val breaker =
new CircuitBreaker(context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute)
然后通过 breaker 给你包装消息:
sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
Breaker有三种状态:Closed、Open和HalfOpen。正常状态为 Closed,当消息失败 $maxFailures 次后状态变为 Open。 Breaker 为状态变化提供回调。如果您想做某事,请使用它。例如:
breaker onOpen { sender ! FailureMessage()}