Akka Streams 拆分 Stream 以进行错误处理
Akka Streams split Stream for Error handling
我正在使用 akka http 和流来满足 API 请求。
当请求无效时,我想 return 一个 400,如果它有效,我想继续计算,然后 return 结果。
我面临的问题是,我从 POST 请求收到的有效负载是一个源,我无法将其转换为 2 个流(一个用于有效输入数据,一个用于无效输入数据)并正确完成请求.
path("alarms")(
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw Exception("Wrong input"))
))
complete(repository.create(flow).run) // <-- here I only want to pass all events that are valid. For the other events complete(HttpResponse(NotFound, entity = "Invalid input")) should be used
})
)
/// The signature of the repository.create looks like that
def create(message: Source[String, NotUsed]): RunnableGraph[Future[Done]]
你可以使用 akka-http handleExceptions
指令,像这样:
val exceptionHandler = ExceptionHandler {
case ex: RuntimeException =>
complete(HttpResponse(NotFound, entity = "Invalid input"))
}
path("alarms")(
handleExceptions(exceptionHandler) {
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw new RuntimeException("Invalid input"))
))
complete(repository.create(flow).run)
})
}
)
文档:
https://doc.akka.io/docs/akka-http/current/routing-dsl/exception-handling.html
还有 handleRejections
指令可以进行更多控制 - 请参阅 https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/execution-directives/handleRejections.html
我正在使用 akka http 和流来满足 API 请求。 当请求无效时,我想 return 一个 400,如果它有效,我想继续计算,然后 return 结果。 我面临的问题是,我从 POST 请求收到的有效负载是一个源,我无法将其转换为 2 个流(一个用于有效输入数据,一个用于无效输入数据)并正确完成请求.
path("alarms")(
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw Exception("Wrong input"))
))
complete(repository.create(flow).run) // <-- here I only want to pass all events that are valid. For the other events complete(HttpResponse(NotFound, entity = "Invalid input")) should be used
})
)
/// The signature of the repository.create looks like that
def create(message: Source[String, NotUsed]): RunnableGraph[Future[Done]]
你可以使用 akka-http handleExceptions
指令,像这样:
val exceptionHandler = ExceptionHandler {
case ex: RuntimeException =>
complete(HttpResponse(NotFound, entity = "Invalid input"))
}
path("alarms")(
handleExceptions(exceptionHandler) {
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw new RuntimeException("Invalid input"))
))
complete(repository.create(flow).run)
})
}
)
文档:
https://doc.akka.io/docs/akka-http/current/routing-dsl/exception-handling.html
还有 handleRejections
指令可以进行更多控制 - 请参阅 https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/execution-directives/handleRejections.html