Caliban GraphQL 订阅的 Scala 集成测试
Scala integration tests for Caliban GraphQL subscriptions
我们有一个 Caliban GraphQL 应用程序,将其与 Play 框架一起使用。它很好地涵盖了查询和变更的集成测试,现在我们要为订阅添加一些集成测试,并想知道如何正确地进行。
对于 queries/mutations 测试,我们使用通常的 FakeRequest
,将其发送到我们扩展 Caliban PlayRouter
的路由器,效果非常好。有没有类似的方法可以测试websockets/subscriptions?
互联网上关于 Play 中的 websocket 测试的信息非常少,根本没有关于 GraphQL 订阅测试的信息。
如有任何想法,我们将不胜感激!
好的,我做到了。有几条规则要遵循:
- 使用 websocket header
"WebSocket-Protocol" -> "graphql-ws"
- 连接建立后,发送
GraphQLWSRequest
类型"connection_init"
- 收到响应
"connection_ack"
后,发送 "start"
类型的 GraphQLWSRequest
并将订阅查询作为负载
完成这些步骤后,服务器开始侦听,您可以发送变更查询。
一些草稿示例:
import caliban.client.GraphQLRequest
import caliban.client.ws.GraphQLWSRequest
import io.circe.syntax.EncoderOps
import play.api.libs.json.{JsValue, Json}
import play.api.test.Helpers.{POST, contentAsJson, contentAsString, contentType, route, status, _}
import org.awaitility.Awaitility
def getWS(subscriptionQuery: String, postQuery: String): JsValue = {
lazy val port = Helpers.testServerPort
val initRequest = prepareWSRequest("connection_init")
val startRequest = prepareWSRequest("start", Some(GraphQLRequest(subscriptionQuery, Map())))
Helpers.running(TestServer(port, app)) {
val headers = new java.util.HashMap[String, String]()
headers.put("WebSocket-Protocol", "graphql-ws")
val queue = new ArrayBlockingQueue[String](1)
lazy val ws = new WebSocketClient(new URI(s"ws://localhost:$port/ws/graphql"), headers) {
override def onOpen(handshakedata: ServerHandshake): Unit =
logger.info("Websocket connection established")
override def onClose(code: Port, reason: String, remote: Boolean): Unit =
logger.info(s"Websocket connection closed, reason: $reason")
override def onError(ex: Exception): Unit =
logger.error("Error handling websocket connection", ex)
override def onMessage(message: String): Unit = {
val ttp = (Json.parse(message) \ "type").as[JsString].value
if (ttp != "connection_ack" && ttp != "ka") queue.put(message)
}
}
ws.connectBlocking()
Future(ws.send(initRequest))
.flatMap(_ => Future(ws.send(startRequest)))
.flatMap(_ => post(query = postQuery)) // post is my local method, it sends usual FakeRequest
Awaitility.await().until(() => queue.peek() != null)
Json.parse(queue.take())
}
def prepareWSRequest(ttp: String, payload: Option[GraphQLRequest] = None) =
GraphQLWSRequest(ttp, None, payload).asJson.noSpaces
}
我们有一个 Caliban GraphQL 应用程序,将其与 Play 框架一起使用。它很好地涵盖了查询和变更的集成测试,现在我们要为订阅添加一些集成测试,并想知道如何正确地进行。
对于 queries/mutations 测试,我们使用通常的 FakeRequest
,将其发送到我们扩展 Caliban PlayRouter
的路由器,效果非常好。有没有类似的方法可以测试websockets/subscriptions?
互联网上关于 Play 中的 websocket 测试的信息非常少,根本没有关于 GraphQL 订阅测试的信息。
如有任何想法,我们将不胜感激!
好的,我做到了。有几条规则要遵循:
- 使用 websocket header
"WebSocket-Protocol" -> "graphql-ws"
- 连接建立后,发送
GraphQLWSRequest
类型"connection_init"
- 收到响应
"connection_ack"
后,发送"start"
类型的GraphQLWSRequest
并将订阅查询作为负载
完成这些步骤后,服务器开始侦听,您可以发送变更查询。
一些草稿示例:
import caliban.client.GraphQLRequest
import caliban.client.ws.GraphQLWSRequest
import io.circe.syntax.EncoderOps
import play.api.libs.json.{JsValue, Json}
import play.api.test.Helpers.{POST, contentAsJson, contentAsString, contentType, route, status, _}
import org.awaitility.Awaitility
def getWS(subscriptionQuery: String, postQuery: String): JsValue = {
lazy val port = Helpers.testServerPort
val initRequest = prepareWSRequest("connection_init")
val startRequest = prepareWSRequest("start", Some(GraphQLRequest(subscriptionQuery, Map())))
Helpers.running(TestServer(port, app)) {
val headers = new java.util.HashMap[String, String]()
headers.put("WebSocket-Protocol", "graphql-ws")
val queue = new ArrayBlockingQueue[String](1)
lazy val ws = new WebSocketClient(new URI(s"ws://localhost:$port/ws/graphql"), headers) {
override def onOpen(handshakedata: ServerHandshake): Unit =
logger.info("Websocket connection established")
override def onClose(code: Port, reason: String, remote: Boolean): Unit =
logger.info(s"Websocket connection closed, reason: $reason")
override def onError(ex: Exception): Unit =
logger.error("Error handling websocket connection", ex)
override def onMessage(message: String): Unit = {
val ttp = (Json.parse(message) \ "type").as[JsString].value
if (ttp != "connection_ack" && ttp != "ka") queue.put(message)
}
}
ws.connectBlocking()
Future(ws.send(initRequest))
.flatMap(_ => Future(ws.send(startRequest)))
.flatMap(_ => post(query = postQuery)) // post is my local method, it sends usual FakeRequest
Awaitility.await().until(() => queue.peek() != null)
Json.parse(queue.take())
}
def prepareWSRequest(ttp: String, payload: Option[GraphQLRequest] = None) =
GraphQLWSRequest(ttp, None, payload).asJson.noSpaces
}