使用 Actor 作为我的 Websocket 客户端流的来源

Use a Actor as a source to my Websocket client flow

我目前有一个简单的 TextMessage Source,它可以像这样将消息发送到我的 Websocket 客户端流:

     val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
      }

    // send this as a message over the WebSocket
    val outgoing: Source[TextMessage.Strict, NotUsed] = Source
      .combine(
        Source.single(
          TextMessage(
            """{"auth":"APIKEY-123"}"""
          )
        ),
        Source.single(
          TextMessage(
            """{"topic":"topic123"}"""
          )
        ),
        Source.never
      )(Merge(_))
      .throttle(1, 1.second)

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(
          Keep.right
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

所以我目前有一个 Source[TextMessage.Strict, NotUsed] 类型的源,但我想在我有一个 ActorRef 作为源的地方使用注释掉的代码。

我试过这个:

  val actorSource: Source[Any, ActorRef] = Source.actorRef(
  completionMatcher = { case Done =>
    CompletionStrategy.immediately
  },
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef !  """{"auth":"APIKEY-123"}"""

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val (upgradeResponse, closed) =
  actorSource
    .viaMat(webSocketFlow)(
      Keep.right
    ) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

因此,当我使用 ActorRef 作为我的源时,我很难尝试将其放入图表中。我收到此编译时错误:

type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(

注意:我想要一个 Actor 作为我的源,也作为我的接收器,即将流产生的所有消息作为接收器传递给另一个 Actor。

谁能解释一下我目前在将我的 Actor 作为来源并试图将其添加到我的 flow/graph 时做错了什么?

更新

这是我现在的代码:

def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val actorSource = Source.actorRef[String](
      completionMatcher = { case Done =>
        CompletionStrategy.immediately
      },
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.dropHead
    )

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val ((sendActor, upgradeResponse), closed) =
      actorSource
        .viaMat(webSocketFlow)(
          Keep.both
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
    sendActor ! TextMessage("""{"topic":"topic123"}""")

    //in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }

我收到以下编译错误:

[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found

你的编译器错误是由于你的 actorSource 没有输出 Message 而是 String (这个错误不应该是你的代码得到的,也许你试过将其更改为 Source[String, ActorRef]?):由于 webSocketFlow 仅处理 Message,因此它只能附加到 Message.

的源

所以我建议如下:

val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
  case Done => CompletionStrategy.immediately
}

val actorSource = Source.actorRef[Message](
  completionMatcher = immediateCompletion,
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val ((sendActor, upgradeResponse), closed) =
  actorSource
    .viaMat(webSocketFlow)(Keep.both)  // keep both the actor and the upgradeResponse
    .toMat(incoming)(Keep.both)  // ...and also keep the closed
    .run()

sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")