Akka Stream 和 HTTP Scala:如何从路由向 Actor 发送消息
Akka Stream and HTTP Scala: How to send Messages to an Actor from a Route
我正在玩 akka-stream-and-http-experimental
1.0。到目前为止,我有一个可以接受和响应 HTTP 请求的用户服务。我还将有一个可以管理约会的约会服务。为了进行约会,必须是现有用户。如果用户存在,约会服务将检查用户服务。现在这显然可以通过 HTTP 完成,但我宁愿让预约服务向用户服务发送消息。作为新手,我不清楚如何使用参与者(如 akka-http
抽象)来发送和接收消息。在 doc 中提到了 ActorRef
和 ActorPublisher
但没有前者的例子,而后者看起来对我的需要来说有点矫枉过正。
我的代码如下所示,位于 Github:
trait UserReadResource extends ActorPlumbing {
val userService: UserService
val readRoute = {
// route stuff
}
}
trait ActorPlumbing {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
}
trait UserService { // Implemented by Slick and MongoDB in the backend
def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}
object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
override implicit val system = ActorSystem()
override implicit def executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override def config = ConfigFactory.load()
override val logger = Logging(system, getClass)
private val collection = newCollection("users")
val userRepository = new MongoDBUserRepository(collection)
val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate {
// implicitly finds the executor in scope. Ain't that cute?
override implicit def executor = implicitly
}
Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}
编辑:
我想出了如何发送消息,这可以使用 Source.actorRef
来完成。这只会将消息发送到流中。我想要做的是让路由处理程序 class 接收响应。这样,当我创建约会服务时,它的参与者可以调用用户服务参与者并以与我的示例中的用户路由处理程序相同的方式接收响应。
伪代码:
val src = Source.single(name) \ How to send this to an actor and get the response
编辑 2:
根据@yardena 的回答,我想出了以下内容,但最后一行无法编译。我的演员发布者 returns a Future
,我猜它会被包装在 Promise
中,然后作为 Future
传递给路由处理程序。
get {
parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
type FindResponse = Future[FindByNameResponse]
val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
_ ! name
}
val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))
val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)
complete(src.runWith(sink)) // doesn't compile
}
}
这个link可能会有帮助:http://zuchos.com/blog/2015/05/23/how-to-write-a-subscriber-for-akka-streams/ and this answer by @Noah
基本上你有两个选择:
1) 如果你想要一个 "simple" actor,它将把它收到的所有消息转发到流中,你可以使用 Source.actorRef。然后,您可以通过使用 mapAsync 创建处理阶段,将消息管道化到 UserService。
2) 另一种选择,如果您希望演员有一些自定义行为,是编写您自己的 ActorPublisher。
HTH
我最终使用了 Actor.ask
。简单。
我正在玩 akka-stream-and-http-experimental
1.0。到目前为止,我有一个可以接受和响应 HTTP 请求的用户服务。我还将有一个可以管理约会的约会服务。为了进行约会,必须是现有用户。如果用户存在,约会服务将检查用户服务。现在这显然可以通过 HTTP 完成,但我宁愿让预约服务向用户服务发送消息。作为新手,我不清楚如何使用参与者(如 akka-http
抽象)来发送和接收消息。在 doc 中提到了 ActorRef
和 ActorPublisher
但没有前者的例子,而后者看起来对我的需要来说有点矫枉过正。
我的代码如下所示,位于 Github:
trait UserReadResource extends ActorPlumbing {
val userService: UserService
val readRoute = {
// route stuff
}
}
trait ActorPlumbing {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
}
trait UserService { // Implemented by Slick and MongoDB in the backend
def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}
object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
override implicit val system = ActorSystem()
override implicit def executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override def config = ConfigFactory.load()
override val logger = Logging(system, getClass)
private val collection = newCollection("users")
val userRepository = new MongoDBUserRepository(collection)
val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate {
// implicitly finds the executor in scope. Ain't that cute?
override implicit def executor = implicitly
}
Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}
编辑:
我想出了如何发送消息,这可以使用 Source.actorRef
来完成。这只会将消息发送到流中。我想要做的是让路由处理程序 class 接收响应。这样,当我创建约会服务时,它的参与者可以调用用户服务参与者并以与我的示例中的用户路由处理程序相同的方式接收响应。
伪代码:
val src = Source.single(name) \ How to send this to an actor and get the response
编辑 2:
根据@yardena 的回答,我想出了以下内容,但最后一行无法编译。我的演员发布者 returns a Future
,我猜它会被包装在 Promise
中,然后作为 Future
传递给路由处理程序。
get {
parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
type FindResponse = Future[FindByNameResponse]
val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
_ ! name
}
val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))
val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)
complete(src.runWith(sink)) // doesn't compile
}
}
这个link可能会有帮助:http://zuchos.com/blog/2015/05/23/how-to-write-a-subscriber-for-akka-streams/ and this answer by @Noah
基本上你有两个选择: 1) 如果你想要一个 "simple" actor,它将把它收到的所有消息转发到流中,你可以使用 Source.actorRef。然后,您可以通过使用 mapAsync 创建处理阶段,将消息管道化到 UserService。 2) 另一种选择,如果您希望演员有一些自定义行为,是编写您自己的 ActorPublisher。
HTH
我最终使用了 Actor.ask
。简单。