在 Akka 中,如何将响应从下游参与者路由到正确的上游?
In Akka, how do I route responses from downstream actor to the correct upstream?
新手问题:我试图让一个缓存管理器位于缓存的多个用户(=上游)和 Redis 客户端(下游)之间,所以:
Client A -----> |
| Cache Manager <=====> Redis Connection --(tcp)--
Client B -----> |
想法是重新使用与 Redis 的单个连接。我可以异步发送 SET 命令,当 redis 客户端 actor 返回响应时,我如何知道将响应中继到哪个客户端?到目前为止,这是我的接收方法:
def receive: PartialFunction[Any, Unit] = {
case Store(key: ByteString, payload: ByteString, metadata: ByeString) => {
// WIP: yes, I could batch these two here
brandoClient ! Request(REDIS_SET, metadata_key(key), metadata)
brandoClient ! Request(REDIS_SET, key, payload)
}
case Some(Ok) => {
???
}
...
}
我能做到:
case Store(key: ByteString, payload: ByteString) => {
val future = brandoClient ? Request(REDIS_SET, key, payload)
sender() ! Await.result(future, request_timeout.duration)
}
但是,这会使缓存管理器阻塞。
我能想到的另一种方法是创建多个引用同一个 Redis 客户端 ActorRef 的缓存管理器 actor,这样我就可以通过这种方式对响应进行重复数据删除。像这样:
Client A -----> Cache Manager A -----> |
| Redis Connection --(tcp)--
Client B -----> Cache Manager B -----> |
这是唯一的方法吗?
谢谢,
如果您能够修改消息签名,那么您始终可以在每条消息中添加类似于 http correlation id 的内容。
每个传入请求都会附带一个 unique id,然后您可以保留一个 Map[UUID, ActorRef]
类型的内部映射。当您有 return 的数据时,只需在地图中查找相应的唯一 ID 并将数据发送回引用的 ActorRef。
您可以 pipe the result of the Future
to the sender. The following example assumes that you're using the Brando Redis 客户端:
而不是阻塞
import akka.actor.Actor
import akka.pattern.{ ask, pipe }
import akka.util.{ ByteString, Timeout }
import brando.{ Request, StatusReply }
import scala.concurrent.duration._
case class Store(key: ByteString, payload: ByteString)
class CacheManager extends Actor {
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
val brandoClient: ActorRef = ???
def receive = {
case Store(key, payload) =>
(brandoClient ? Request("SET", key, payload))
.mapTo[Some[StatusReply]]
.pipeTo(sender())
// case ...
}
}
新手问题:我试图让一个缓存管理器位于缓存的多个用户(=上游)和 Redis 客户端(下游)之间,所以:
Client A -----> |
| Cache Manager <=====> Redis Connection --(tcp)--
Client B -----> |
想法是重新使用与 Redis 的单个连接。我可以异步发送 SET 命令,当 redis 客户端 actor 返回响应时,我如何知道将响应中继到哪个客户端?到目前为止,这是我的接收方法:
def receive: PartialFunction[Any, Unit] = {
case Store(key: ByteString, payload: ByteString, metadata: ByeString) => {
// WIP: yes, I could batch these two here
brandoClient ! Request(REDIS_SET, metadata_key(key), metadata)
brandoClient ! Request(REDIS_SET, key, payload)
}
case Some(Ok) => {
???
}
...
}
我能做到:
case Store(key: ByteString, payload: ByteString) => {
val future = brandoClient ? Request(REDIS_SET, key, payload)
sender() ! Await.result(future, request_timeout.duration)
}
但是,这会使缓存管理器阻塞。
我能想到的另一种方法是创建多个引用同一个 Redis 客户端 ActorRef 的缓存管理器 actor,这样我就可以通过这种方式对响应进行重复数据删除。像这样:
Client A -----> Cache Manager A -----> |
| Redis Connection --(tcp)--
Client B -----> Cache Manager B -----> |
这是唯一的方法吗?
谢谢,
如果您能够修改消息签名,那么您始终可以在每条消息中添加类似于 http correlation id 的内容。
每个传入请求都会附带一个 unique id,然后您可以保留一个 Map[UUID, ActorRef]
类型的内部映射。当您有 return 的数据时,只需在地图中查找相应的唯一 ID 并将数据发送回引用的 ActorRef。
您可以 pipe the result of the Future
to the sender. The following example assumes that you're using the Brando Redis 客户端:
import akka.actor.Actor
import akka.pattern.{ ask, pipe }
import akka.util.{ ByteString, Timeout }
import brando.{ Request, StatusReply }
import scala.concurrent.duration._
case class Store(key: ByteString, payload: ByteString)
class CacheManager extends Actor {
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
val brandoClient: ActorRef = ???
def receive = {
case Store(key, payload) =>
(brandoClient ? Request("SET", key, payload))
.mapTo[Some[StatusReply]]
.pipeTo(sender())
// case ...
}
}