Play Specs2:对 WebSocket 服务器的 FakeRequest

Play Specs2: FakeRequest to WebSocket server

我有一个使用以下方法的控制器

def subscribe(id: String) = WebSocket.tryAcceptWithActor[JsValue, JsValue]

我在conf/routes中写了以下路线:

GET     /subscribe/:id     @controllers.WsManager.subscribe(id: String)

该应用程序有效,但我想通过 Specs2 进行特定测试。

我尝试 "subscribe" 使用 websocket 端点:

val request = FakeRequest(GET, "/subscribe/1234")
val response = route(request)

val request = FakeRequest(GET, "ws://localhost:3333/subscribe/1234")
val response = route(request)

但在这两种情况下都不起作用:我收到 None 作为响应(response.isDefined 为假)。 在 Specs2 测试中是否有连接到 WebSocket 端点的正确方法?

Typesafe 提供了一个测试 WebSockets 的示例

/*
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package play.it.http.websocket

import play.api.test._
import play.api.Application
import scala.concurrent.{Future, Promise}
import play.api.mvc.{Handler, Results, WebSocket}
import play.api.libs.iteratee._
import java.net.URI
import org.jboss.netty.handler.codec.http.websocketx._
import org.specs2.matcher.Matcher
import akka.actor.{ActorRef, PoisonPill, Actor, Props}
import play.mvc.WebSocket.{Out, In}
import play.core.Router.HandlerDef
import java.util.concurrent.atomic.AtomicReference
import org.jboss.netty.buffer.ChannelBuffers

object WebSocketSpec extends PlaySpecification with WsTestClient {

  sequential

  def withServer[A](webSocket: Application => Handler)(block: => A): A = {
    val currentApp = new AtomicReference[FakeApplication]
    val app = FakeApplication(
      withRoutes = {
        case (_, _) => webSocket(currentApp.get())
      }
    )
    currentApp.set(app)
    running(TestServer(testServerPort, app))(block)
  }

  def runWebSocket[A](handler: (Enumerator[WebSocketFrame], Iteratee[WebSocketFrame, _]) => Future[A]): A = {
    val innerResult = Promise[A]()
    WebSocketClient { client =>
      await(client.connect(URI.create("ws://localhost:" + testServerPort + "/stream")) { (in, out) =>
        innerResult.completeWith(handler(in, out))
      })
    }
    await(innerResult.future)
  }

  def textFrame(matcher: Matcher[String]): Matcher[WebSocketFrame] = beLike {
    case t: TextWebSocketFrame => t.getText must matcher
  }

  def closeFrame(status: Int = 1000): Matcher[WebSocketFrame] = beLike {
    case close: CloseWebSocketFrame => close.getStatusCode must_== status
  }

  def binaryBuffer(text: String) = ChannelBuffers.wrappedBuffer(text.getBytes("utf-8"))

  /**
   * Iteratee getChunks that invokes a callback as soon as it's done.
   */
  def getChunks[A](chunks: List[A], onDone: List[A] => _): Iteratee[A, List[A]] = Cont {
    case Input.El(c) => getChunks(c :: chunks, onDone)
    case Input.EOF =>
      val result = chunks.reverse
      onDone(result)
      Done(result, Input.EOF)
    case Input.Empty => getChunks(chunks, onDone)
  }

  /*
   * Shared tests
   */
  def allowConsumingMessages(webSocket: Application => Promise[List[String]] => Handler) = {
    val consumed = Promise[List[String]]()
    withServer(app => webSocket(app)(consumed)) {
      val result = runWebSocket { (in, out) =>
        Enumerator(new TextWebSocketFrame("a"), new TextWebSocketFrame("b"), new CloseWebSocketFrame(1000, "")) |>>> out
        consumed.future
      }
      result must_== Seq("a", "b")
    }
  }

  def allowSendingMessages(webSocket: Application => List[String] => Handler) = {
    withServer(app => webSocket(app)(List("a", "b"))) {
      val frames = runWebSocket { (in, out) =>
        in |>>> Iteratee.getChunks[WebSocketFrame]
      }
      frames must contain(exactly(
        textFrame(be_==("a")),
        textFrame(be_==("b")),
        closeFrame()
      ).inOrder)
    }
  }

  def cleanUpWhenClosed(webSocket: Application => Promise[Boolean] => Handler) = {
    val cleanedUp = Promise[Boolean]()
    withServer(app => webSocket(app)(cleanedUp)) {
      runWebSocket { (in, out) =>
        out.run
        cleanedUp.future
      } must beTrue
    }
  }

  def closeWhenTheConsumerIsDone(webSocket: Application => Handler) = {
    withServer(app => webSocket(app)) {
      val frames = runWebSocket { (in, out) =>
        Enumerator[WebSocketFrame](new TextWebSocketFrame("foo")) |>> out
        in |>>> Iteratee.getChunks[WebSocketFrame]
      }
      frames must contain(exactly(
        closeFrame()
      ))
    }
  }

  def allowRejectingTheWebSocketWithAResult(webSocket: Application => Int => Handler) = {
    withServer(app => webSocket(app)(FORBIDDEN)) {
      implicit val port = testServerPort
      await(wsUrl("/stream").withHeaders(
        "Upgrade" -> "websocket",
        "Connection" -> "upgrade"
      ).get()).status must_== FORBIDDEN
    }
  }

  "Plays WebSockets" should {
    "allow consuming messages" in allowConsumingMessages { _ => consumed =>
      WebSocket.using[String] { req =>
        (getChunks[String](Nil, consumed.success _), Enumerator.empty)
      }
    }

    "allow sending messages" in allowSendingMessages { _ => messages =>
      WebSocket.using[String] { req =>
        (Iteratee.ignore, Enumerator.enumerate(messages) >>> Enumerator.eof)
      }
    }

    "close when the consumer is done" in closeWhenTheConsumerIsDone { _ =>
      WebSocket.using[String] { req =>
        (Iteratee.head, Enumerator.empty)
      }
    }

    "clean up when closed" in cleanUpWhenClosed { _ => cleanedUp =>
      WebSocket.using[String] { req =>
        (Iteratee.ignore, Enumerator.empty[String].onDoneEnumerating(cleanedUp.success(true)))
      }
    }

    "allow rejecting a websocket with a result" in allowRejectingTheWebSocketWithAResult { _ => statusCode =>
      WebSocket.tryAccept[String] { req =>
        Future.successful(Left(Results.Status(statusCode)))
      }
    }

    "allow handling a WebSocket with an actor" in {

      "allow consuming messages" in allowConsumingMessages { implicit app => consumed =>
        WebSocket.acceptWithActor[String, String] { req => out =>
          Props(new Actor() {
            var messages = List.empty[String]
            def receive = {
              case msg: String =>
                messages = msg :: messages
            }
            override def postStop() = {
              consumed.success(messages.reverse)
            }
          })
        }
      }

      "allow sending messages" in allowSendingMessages { implicit app => messages =>
        WebSocket.acceptWithActor[String, String] { req => out =>
          Props(new Actor() {
            messages.foreach { msg =>
              out ! msg
            }
            out ! PoisonPill
            def receive = PartialFunction.empty
          })
        }
      }

      "close when the consumer is done" in closeWhenTheConsumerIsDone { implicit app =>
      WebSocket.acceptWithActor[String, String] { req => out =>
          Props(new Actor() {
            out ! PoisonPill
            def receive = PartialFunction.empty
          })
        }
      }

      "clean up when closed" in cleanUpWhenClosed { implicit app => cleanedUp =>
        WebSocket.acceptWithActor[String, String] { req => out =>
          Props(new Actor() {
            def receive = PartialFunction.empty
            override def postStop() = {
              cleanedUp.success(true)
            }
          })
        }
      }

      "allow rejecting a websocket with a result" in allowRejectingTheWebSocketWithAResult { implicit app => statusCode =>
        WebSocket.tryAcceptWithActor[String, String] { req =>
          Future.successful(Left(Results.Status(statusCode)))
        }
      }

      "aggregate text frames" in {
        val consumed = Promise[List[String]]()
        withServer(app => WebSocket.using[String] { req =>
          (getChunks[String](Nil, consumed.success _), Enumerator.empty)
        }) {
          val result = runWebSocket { (in, out) =>
            Enumerator(
              new TextWebSocketFrame("first"),
              new TextWebSocketFrame(false, 0, "se"),
              new ContinuationWebSocketFrame(false, 0, "co"),
              new ContinuationWebSocketFrame(true, 0, "nd"),
              new TextWebSocketFrame("third"),
              new CloseWebSocketFrame(1000, "")) |>>> out
            consumed.future
          }
          result must_== Seq("first", "second", "third")
        }

      }

      "aggregate binary frames" in {
        val consumed = Promise[List[Array[Byte]]]()

        withServer(app => WebSocket.using[Array[Byte]] { req =>
          (getChunks[Array[Byte]](Nil, consumed.success _), Enumerator.empty)
        }) {
          val result = runWebSocket { (in, out) =>
            Enumerator(
              new BinaryWebSocketFrame(binaryBuffer("first")),
              new BinaryWebSocketFrame(false, 0, binaryBuffer("se")),
              new ContinuationWebSocketFrame(false, 0, binaryBuffer("co")),
              new ContinuationWebSocketFrame(true, 0, binaryBuffer("nd")),
              new BinaryWebSocketFrame(binaryBuffer("third")),
              new CloseWebSocketFrame(1000, "")) |>>> out
            consumed.future
          }
          result.map(b => b.toSeq) must_== Seq("first".getBytes("utf-8").toSeq, "second".getBytes("utf-8").toSeq, "third".getBytes("utf-8").toSeq)
        }
      }

      "close the websocket when the buffer limit is exceeded" in {
        withServer(app => WebSocket.using[String] { req =>
          (Iteratee.ignore, Enumerator.empty)
        }) {
          val frames = runWebSocket { (in, out) =>
            Enumerator[WebSocketFrame](
              new TextWebSocketFrame(false, 0, "first frame"),
              new ContinuationWebSocketFrame(true, 0, new String(Array.range(1, 65530).map(_ => 'a')))
            ) |>> out
            in |>>> Iteratee.getChunks[WebSocketFrame]
          }
          frames must contain(exactly(
            closeFrame(1009)
          ))
        }
      }

    }

    "allow handling a WebSocket in java" in {

      import play.core.Router.HandlerInvokerFactory
      import play.core.Router.HandlerInvokerFactory._
      import play.mvc.{ WebSocket => JWebSocket, Results => JResults }
      import play.libs.F

      implicit def toHandler[J <: AnyRef](javaHandler: J)(implicit factory: HandlerInvokerFactory[J]): Handler = {
        val invoker = factory.createInvoker(
          javaHandler,
          new HandlerDef(javaHandler.getClass.getClassLoader, "package", "controller", "method", Nil, "GET", "", "/stream")
        )
        invoker.call(javaHandler)
      }

      "allow consuming messages" in allowConsumingMessages { _ => consumed =>
        new JWebSocket[String] {
          @volatile var messages = List.empty[String]
          def onReady(in: In[String], out: Out[String]) = {
            in.onMessage(new F.Callback[String] {
              def invoke(msg: String) = messages = msg :: messages
            })
            in.onClose(new F.Callback0 {
              def invoke() = consumed.success(messages.reverse)
            })
          }
        }
      }

      "allow sending messages" in allowSendingMessages { _ => messages =>
        new JWebSocket[String] {
          def onReady(in: In[String], out: Out[String]) = {
            messages.foreach { msg =>
              out.write(msg)
            }
            out.close()
          }
        }
      }

      "clean up when closed" in cleanUpWhenClosed { _ => cleanedUp =>
        new JWebSocket[String] {
          def onReady(in: In[String], out: Out[String]) = {
            in.onClose(new F.Callback0 {
              def invoke() = cleanedUp.success(true)
            })
          }
        }
      }

      "allow rejecting a websocket with a result" in allowRejectingTheWebSocketWithAResult { _ => statusCode =>
        JWebSocket.reject[String](JResults.status(statusCode))
      }

      "allow handling a websocket with an actor" in allowSendingMessages { _ => messages =>
        JWebSocket.withActor[String](new F.Function[ActorRef, Props]() {
          def apply(out: ActorRef) = {
            Props(new Actor() {
              messages.foreach { msg =>
                out ! msg
              }
              out ! PoisonPill
              def receive = PartialFunction.empty
            })
          }
        })
      }
    }
  }

查看此代码和 WebSocketClient.scala(需要 class)在 https://github.com/playframework/playframework/tree/ca828431ab2c2c47bf0efede071dc9abced129b9/framework/src/play-integration-test/src/test/scala/play/it/http/websocket 用于播放版本 2.3.9。