根据发送的消息在 Akka 上恢复

Recover on Akka ask based on the message sent

我正在通过 ask 向演员发送不同的消息。在超时时,我想提供一个默认值,该值对于向演员询问的消息是不同的。

由于超时异常总是相同的,我不能在 recover 到 return 不同的默认值中使用它我需要发送原始消息。

如何才能做到这一点。

代码示例:

      val storageActorProxy = Flow[ByteString]
        .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
        .map(TCPMessage.decode)
        .ask[OperationResponse](storageActor)
        //TODO: looking for this recover; non-existent AFAIK
        .customRecover { 
            case Op1 => DefaultResponseA()
            case Op2 => DefaultResponseB()
        }
        .map(TCPMessage.encode(_).toByteString)

A​​kka 的 ask 方法实际上很容易重新创建 - 它只是一个 mapAsync 和一些额外的逻辑,以便在 actor 死亡时更好地出错 (see the code)。因此,只需手动使用 mapAsync 即可恢复询问错误。

val storageActorProxy = Flow[ByteString]
  .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
  .map(TCPMessage.decode)
  .mapAsync(parallelism = 2) { decodedMessage =>
     (storageActor ? decodedMessage).recover {
       case Op1 => DefaultResponseA()
       case Op2 => DefaultResponseB()
     }
  }
  .map(TCPMessage.encode(_).toByteString)