使用 Actor 作为我的 Websocket 客户端流的来源
Use a Actor as a source to my Websocket client flow
我目前有一个简单的 TextMessage Source,它可以像这样将消息发送到我的 Websocket 客户端流:
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
}
// send this as a message over the WebSocket
val outgoing: Source[TextMessage.Strict, NotUsed] = Source
.combine(
Source.single(
TextMessage(
"""{"auth":"APIKEY-123"}"""
)
),
Source.single(
TextMessage(
"""{"topic":"topic123"}"""
)
),
Source.never
)(Merge(_))
.throttle(1, 1.second)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
所以我目前有一个 Source[TextMessage.Strict, NotUsed] 类型的源,但我想在我有一个 ActorRef 作为源的地方使用注释掉的代码。
我试过这个:
val actorSource: Source[Any, ActorRef] = Source.actorRef(
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef ! """{"auth":"APIKEY-123"}"""
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
因此,当我使用 ActorRef 作为我的源时,我很难尝试将其放入图表中。我收到此编译时错误:
type mismatch; [error] found :
akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]]
[error] required:
akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(
注意:我想要一个 Actor 作为我的源,也作为我的接收器,即将流产生的所有消息作为接收器传递给另一个 Actor。
谁能解释一下我目前在将我的 Actor 作为来源并试图将其添加到我的 flow/graph 时做错了什么?
更新
这是我现在的代码:
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val actorSource = Source.actorRef[String](
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.both
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
//in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
我收到以下编译错误:
[error] The argument types of an anonymous function must be fully
known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^
[error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value
flatMap is not a member of Any [error] val connected =
upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value
! is not a member of Any [error] sendActor !
TextMessage("""{"auth":"APIKEY-123"}""") [error] ^
[error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value !
is not a member of Any [error] sendActor !
TextMessage("""{"topic":"topic123"}""") [error] ^
[error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value
foreach is not a member of Any [error] closed.foreach(_ =>
println("closed")) [error] ^ [error] 5 errors found
你的编译器错误是由于你的 actorSource
没有输出 Message
而是 String
(这个错误不应该是你的代码得到的,也许你试过将其更改为 Source[String, ActorRef]
?):由于 webSocketFlow
仅处理 Message
,因此它只能附加到 Message
.
的源
所以我建议如下:
val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
case Done => CompletionStrategy.immediately
}
val actorSource = Source.actorRef[Message](
completionMatcher = immediateCompletion,
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(Keep.both) // keep both the actor and the upgradeResponse
.toMat(incoming)(Keep.both) // ...and also keep the closed
.run()
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
我目前有一个简单的 TextMessage Source,它可以像这样将消息发送到我的 Websocket 客户端流:
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
}
// send this as a message over the WebSocket
val outgoing: Source[TextMessage.Strict, NotUsed] = Source
.combine(
Source.single(
TextMessage(
"""{"auth":"APIKEY-123"}"""
)
),
Source.single(
TextMessage(
"""{"topic":"topic123"}"""
)
),
Source.never
)(Merge(_))
.throttle(1, 1.second)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
所以我目前有一个 Source[TextMessage.Strict, NotUsed] 类型的源,但我想在我有一个 ActorRef 作为源的地方使用注释掉的代码。
我试过这个:
val actorSource: Source[Any, ActorRef] = Source.actorRef(
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef ! """{"auth":"APIKEY-123"}"""
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
因此,当我使用 ActorRef 作为我的源时,我很难尝试将其放入图表中。我收到此编译时错误:
type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(
注意:我想要一个 Actor 作为我的源,也作为我的接收器,即将流产生的所有消息作为接收器传递给另一个 Actor。
谁能解释一下我目前在将我的 Actor 作为来源并试图将其添加到我的 flow/graph 时做错了什么?
更新
这是我现在的代码:
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val actorSource = Source.actorRef[String](
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.both
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
//in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
我收到以下编译错误:
[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found
你的编译器错误是由于你的 actorSource
没有输出 Message
而是 String
(这个错误不应该是你的代码得到的,也许你试过将其更改为 Source[String, ActorRef]
?):由于 webSocketFlow
仅处理 Message
,因此它只能附加到 Message
.
所以我建议如下:
val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
case Done => CompletionStrategy.immediately
}
val actorSource = Source.actorRef[Message](
completionMatcher = immediateCompletion,
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(Keep.both) // keep both the actor and the upgradeResponse
.toMat(incoming)(Keep.both) // ...and also keep the closed
.run()
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")