在 Akka 中,如何将响应从下游参与者路由到正确的上游?

In Akka, how do I route responses from downstream actor to the correct upstream?

新手问题:我试图让一个缓存管理器位于缓存的多个用户(=上游)和 Redis 客户端(下游)之间,所以:

Client A  -----> |                
                 | Cache Manager <=====> Redis Connection --(tcp)--
Client B  -----> |

想法是重新使用与 Redis 的单个连接。我可以异步发送 SET 命令,当 redis 客户端 actor 返回响应时,我如何知道将响应中继到哪个客户端?到目前为止,这是我的接收方法:

def receive: PartialFunction[Any, Unit] = {

  case Store(key: ByteString, payload: ByteString, metadata: ByeString) => {
    // WIP: yes, I could batch these two here
    brandoClient ! Request(REDIS_SET, metadata_key(key), metadata)
    brandoClient ! Request(REDIS_SET, key, payload)
  }

  case Some(Ok) => {
    ???
  }
  ...
}

我能做到:

case Store(key: ByteString, payload: ByteString) => {
  val future = brandoClient ? Request(REDIS_SET, key, payload)
  sender() ! Await.result(future, request_timeout.duration)
}

但是,这会使缓存管理器阻塞。

我能想到的另一种方法是创建多个引用同一个 Redis 客户端 ActorRef 的缓存管理器 actor,这样我就可以通过这种方式对响应进行重复数据删除。像这样:

Client A  -----> Cache Manager A -----> |               
                                        | Redis Connection --(tcp)--
Client B  -----> Cache Manager B -----> |

这是唯一的方法吗?

谢谢,

如果您能够修改消息签名,那么您始终可以在每条消息中添加类似于 http correlation id 的内容。

每个传入请求都会附带一个 unique id,然后您可以保留一个 Map[UUID, ActorRef] 类型的内部映射。当您有 return 的数据时,只需在地图中查找相应的唯一 ID 并将数据发送回引用的 ActorRef。

您可以 pipe the result of the Future to the sender. The following example assumes that you're using the Brando Redis 客户端:

而不是阻塞
import akka.actor.Actor
import akka.pattern.{ ask, pipe }
import akka.util.{ ByteString, Timeout }
import brando.{ Request, StatusReply }
import scala.concurrent.duration._

case class Store(key: ByteString, payload: ByteString)

class CacheManager extends Actor {
  import context.dispatcher
  implicit val timeout = Timeout(5 seconds)

  val brandoClient: ActorRef = ???

  def receive = {
    case Store(key, payload) =>
      (brandoClient ? Request("SET", key, payload))
        .mapTo[Some[StatusReply]]
        .pipeTo(sender())

    // case ...
  }
}