根据发送的消息在 Akka 上恢复
Recover on Akka ask based on the message sent
我正在通过 ask
向演员发送不同的消息。在超时时,我想提供一个默认值,该值对于向演员询问的消息是不同的。
由于超时异常总是相同的,我不能在 recover
到 return 不同的默认值中使用它我需要发送原始消息。
如何才能做到这一点。
代码示例:
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.ask[OperationResponse](storageActor)
//TODO: looking for this recover; non-existent AFAIK
.customRecover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
.map(TCPMessage.encode(_).toByteString)
Akka 的 ask
方法实际上很容易重新创建 - 它只是一个 mapAsync
和一些额外的逻辑,以便在 actor 死亡时更好地出错 (see the code)。因此,只需手动使用 mapAsync
即可恢复询问错误。
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.mapAsync(parallelism = 2) { decodedMessage =>
(storageActor ? decodedMessage).recover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
}
.map(TCPMessage.encode(_).toByteString)
我正在通过 ask
向演员发送不同的消息。在超时时,我想提供一个默认值,该值对于向演员询问的消息是不同的。
由于超时异常总是相同的,我不能在 recover
到 return 不同的默认值中使用它我需要发送原始消息。
如何才能做到这一点。
代码示例:
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.ask[OperationResponse](storageActor)
//TODO: looking for this recover; non-existent AFAIK
.customRecover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
.map(TCPMessage.encode(_).toByteString)
Akka 的 ask
方法实际上很容易重新创建 - 它只是一个 mapAsync
和一些额外的逻辑,以便在 actor 死亡时更好地出错 (see the code)。因此,只需手动使用 mapAsync
即可恢复询问错误。
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.mapAsync(parallelism = 2) { decodedMessage =>
(storageActor ? decodedMessage).recover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
}
.map(TCPMessage.encode(_).toByteString)