在 Play 2.5 中编写 BodyParser
Composing BodyParser in Play 2.5
给定具有此签名的函数:
def parser[A](otherParser: BodyParser[A]): BodyParser[A]
如何编写函数,以便在将请求 body 传递给 otherParser
之前对其进行检查和验证?
为简单起见,假设我想验证 header(可能 "Some-Header")的值与 body 完全匹配。所以如果我有这个动作:
def post(): Action(parser(parse.tolerantText)) { request =>
Ok(request.body)
}
当我发出像 curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post
这样的请求时,它应该 return "hello" 在响应 body 中状态为 200。如果我的请求是 curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post
它应该 return 没有 body.
的 400
这是我试过的方法。
这个无法编译,因为 otherParser(request).through(flow)
期望 flow
输出 ByteString
。这里的想法是,流可以通过 Either
输出通知累加器是否继续处理。我不确定如何让累加器知道上一步的状态。
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes =>
if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
Right(bytes)
} else {
Left(BadRequest)
}
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
// This fails to compile because flow needs to output a ByteString
acc.through(flow)
}
我也尝试过使用过滤器。这个确实编译并且写入的响应 body 是正确的。但是它总是 return 是 200 Ok
响应状态。
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes =>
request.headers.get("Some-Header").contains(bytes.utf8String)
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
acc.through(flow)
}
我想出了一个使用 GraphStageWithMaterializedValue
的解决方案。这个概念是从 Play's maxLength
body parser 借来的。我在问题中的第一次尝试(未编译)之间的主要区别在于,我不应该尝试改变流,而是应该使用物化值来传达有关处理状态的信息。虽然我创建了一个 Flow[ByteString, Either[Result, ByteString], NotUsed]
,结果我需要的是一个 Flow[ByteString, ByteString, Future[Boolean]]
.
因此,我的 parser
函数最终看起来像这样:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
重点是这一行:
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
一旦您能够创建此流程,剩下的事情就会到位。不幸的是 BodyValidator
非常冗长,感觉有点 boiler-platey。无论如何,它大多很容易阅读。 GraphStageWithMaterializedValue
期望你实现 def shape: S
(S
这里是 FlowShape[ByteString, ByteString]
)来指定这个图的输入类型和输出类型。它还希望您输入 def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)
(M
在这里是 Future[Boolean]
)来定义图表实际应该做什么。这是BodyValidator
的完整代码(我将在下面详细解释):
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic, status.future)
}
}
您首先要创建一个 Inlet
和 Outlet
来为您的图形设置输入和输出
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
然后你用这些来定义shape
.
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
在 createLogicAndMaterializedValue
中,您需要初始化要实现的值。在这里,我使用了一个承诺,当我从流中获得完整数据时可以解决这个问题。我还创建了一个 ByteStringBuilder
来跟踪迭代之间的数据。
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
然后我创建一个 GraphStageLogic
来实际设置该图在每个处理点的作用。正在设置两个处理程序。一个是 InHandler
用于处理来自上游源的数据。另一个是 OutHandler
用于处理向下游发送的数据。在 OutHandler
中没有什么真正有趣的东西,所以我在这里忽略它,除了说它是必要的样板,以避免 IllegalStateException
。在 InHandler
中覆盖了三个方法:onPush
、onUpstreamFinish
和 onUpstreamFailure
。 onPush
在上游准备好新数据时调用。在这种方法中,我只是获取下一个数据块,将其写入 bodyBuffer
并将数据推送到下游。
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
onUpstreamFinish
在上游完成时调用(惊喜)。这就是将 body 与 header 进行比较的业务逻辑发生的地方。
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
onUpstreamFailure
的实现是为了当出现问题时,我也可以将物化的未来标记为失败。
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
然后我只是 return 我创建的 GraphStageLogic
和 status.future
作为一个元组。
给定具有此签名的函数:
def parser[A](otherParser: BodyParser[A]): BodyParser[A]
如何编写函数,以便在将请求 body 传递给 otherParser
之前对其进行检查和验证?
为简单起见,假设我想验证 header(可能 "Some-Header")的值与 body 完全匹配。所以如果我有这个动作:
def post(): Action(parser(parse.tolerantText)) { request =>
Ok(request.body)
}
当我发出像 curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post
这样的请求时,它应该 return "hello" 在响应 body 中状态为 200。如果我的请求是 curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post
它应该 return 没有 body.
这是我试过的方法。
这个无法编译,因为 otherParser(request).through(flow)
期望 flow
输出 ByteString
。这里的想法是,流可以通过 Either
输出通知累加器是否继续处理。我不确定如何让累加器知道上一步的状态。
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes =>
if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
Right(bytes)
} else {
Left(BadRequest)
}
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
// This fails to compile because flow needs to output a ByteString
acc.through(flow)
}
我也尝试过使用过滤器。这个确实编译并且写入的响应 body 是正确的。但是它总是 return 是 200 Ok
响应状态。
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes =>
request.headers.get("Some-Header").contains(bytes.utf8String)
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
acc.through(flow)
}
我想出了一个使用 GraphStageWithMaterializedValue
的解决方案。这个概念是从 Play's maxLength
body parser 借来的。我在问题中的第一次尝试(未编译)之间的主要区别在于,我不应该尝试改变流,而是应该使用物化值来传达有关处理状态的信息。虽然我创建了一个 Flow[ByteString, Either[Result, ByteString], NotUsed]
,结果我需要的是一个 Flow[ByteString, ByteString, Future[Boolean]]
.
因此,我的 parser
函数最终看起来像这样:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
重点是这一行:
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
一旦您能够创建此流程,剩下的事情就会到位。不幸的是 BodyValidator
非常冗长,感觉有点 boiler-platey。无论如何,它大多很容易阅读。 GraphStageWithMaterializedValue
期望你实现 def shape: S
(S
这里是 FlowShape[ByteString, ByteString]
)来指定这个图的输入类型和输出类型。它还希望您输入 def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)
(M
在这里是 Future[Boolean]
)来定义图表实际应该做什么。这是BodyValidator
的完整代码(我将在下面详细解释):
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic, status.future)
}
}
您首先要创建一个 Inlet
和 Outlet
来为您的图形设置输入和输出
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
然后你用这些来定义shape
.
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
在 createLogicAndMaterializedValue
中,您需要初始化要实现的值。在这里,我使用了一个承诺,当我从流中获得完整数据时可以解决这个问题。我还创建了一个 ByteStringBuilder
来跟踪迭代之间的数据。
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
然后我创建一个 GraphStageLogic
来实际设置该图在每个处理点的作用。正在设置两个处理程序。一个是 InHandler
用于处理来自上游源的数据。另一个是 OutHandler
用于处理向下游发送的数据。在 OutHandler
中没有什么真正有趣的东西,所以我在这里忽略它,除了说它是必要的样板,以避免 IllegalStateException
。在 InHandler
中覆盖了三个方法:onPush
、onUpstreamFinish
和 onUpstreamFailure
。 onPush
在上游准备好新数据时调用。在这种方法中,我只是获取下一个数据块,将其写入 bodyBuffer
并将数据推送到下游。
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
onUpstreamFinish
在上游完成时调用(惊喜)。这就是将 body 与 header 进行比较的业务逻辑发生的地方。
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
onUpstreamFailure
的实现是为了当出现问题时,我也可以将物化的未来标记为失败。
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
然后我只是 return 我创建的 GraphStageLogic
和 status.future
作为一个元组。