在喷雾路线中链接 Akka Actor
Chaining Akka Actors in a Spray Route
我有一个 restful API 接收一组 JSON 消息,这些消息将被转换为单独的 Avro 消息,然后发送到 Kafka。在路由中,我调用了 3 个不同的参与者:1) 一个参与者出去并从磁盘中检索 Avro 模式 2) 然后遍历 JSON 消息数组并将其与第二个参与者中的 Avro 模式进行比较。如果任何消息未通过验证,那么我需要 return 向 API 的调用方返回一个响应并停止处理。 3) 遍历数组并传递给第 3 个获取 JSON object 的参与者,将其转换为 Avro 消息并发送到 Kafka 主题。
让我头疼的问题是,如果其中一位演员出现故障,如何停止处理路线。我将请求上下文传递给每个参与者并调用它的完整方法,但它似乎并没有立即停止,下一个参与者仍在处理,即使它不应该处理。这是我在路线中所做的代码片段:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我已经浏览了很多关于此主题的 blogs/books/slides 资料,但不确定最佳方法是什么。我已经使用 Scala/Akka 大约 2 个月了,基本上是自学我一直需要的部分。因此,非常感谢经验丰富的 Scala/Akka/Spray 开发人员对此有任何见解。我的想法是将 3 个演员包装在一个 'master' 演员中,并使每个演员成为该演员的 child 并尝试那样处理它。
由于您正在使用异步处理 (!
),您无法控制消息发送后的处理。您需要使用 ask (?
) 来 return 您可以使用的未来。
但我有一个更好的主意。您可以从第一个演员向第二个演员发送消息。而不是 return 将结果发送给第一个参与者,您可以将消息发送给第三个参与者以继续计算。
我有一个 restful API 接收一组 JSON 消息,这些消息将被转换为单独的 Avro 消息,然后发送到 Kafka。在路由中,我调用了 3 个不同的参与者:1) 一个参与者出去并从磁盘中检索 Avro 模式 2) 然后遍历 JSON 消息数组并将其与第二个参与者中的 Avro 模式进行比较。如果任何消息未通过验证,那么我需要 return 向 API 的调用方返回一个响应并停止处理。 3) 遍历数组并传递给第 3 个获取 JSON object 的参与者,将其转换为 Avro 消息并发送到 Kafka 主题。
让我头疼的问题是,如果其中一位演员出现故障,如何停止处理路线。我将请求上下文传递给每个参与者并调用它的完整方法,但它似乎并没有立即停止,下一个参与者仍在处理,即使它不应该处理。这是我在路线中所做的代码片段:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我已经浏览了很多关于此主题的 blogs/books/slides 资料,但不确定最佳方法是什么。我已经使用 Scala/Akka 大约 2 个月了,基本上是自学我一直需要的部分。因此,非常感谢经验丰富的 Scala/Akka/Spray 开发人员对此有任何见解。我的想法是将 3 个演员包装在一个 'master' 演员中,并使每个演员成为该演员的 child 并尝试那样处理它。
由于您正在使用异步处理 (!
),您无法控制消息发送后的处理。您需要使用 ask (?
) 来 return 您可以使用的未来。
但我有一个更好的主意。您可以从第一个演员向第二个演员发送消息。而不是 return 将结果发送给第一个参与者,您可以将消息发送给第三个参与者以继续计算。