Akka 将 websocket 流式传输到 Sink.seq 以异常 SubscriptionWithCancelException$StageWasCompleted 结束
Akka streams websocket stream things to a Sink.seq ends with exception SubscriptionWithCancelException$StageWasCompleted
我未能具体化 Sink.seq
,到了具体化的时候我失败了,出现了这个异常
akka.stream.SubscriptionWithCancelException$StageWasCompleted$:
我正在尝试将所有从 websocket 推出的元素聚合到 Sink.seq
中。在 Sink.seq
.
中聚合之前,我必须进行一些 json 转换
val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] =
Sink.seq[WalletNotification[_]]
val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message]
.map {
case message: TextMessage.Strict =>
//we should be able to parse the address message
val text = message.text
val notification: WalletNotification[_] = {
upickle.default.read[WalletNotification[_]](text)(
WsPicklers.walletNotificationPickler)
}
logger.info(s"Notification=$notification")
notification
case msg =>
logger.error(s"msg=$msg")
sys.error("")
}
.log(s"@@@ endSink @@@")
.toMat(endSink)(Keep.right)
val f: Flow[
Message,
Message,
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = {
Flow
.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
}
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, f)
}
val walletNotificationsF: Future[Seq[WalletNotification[_]]] =
tuple._2._1
val promise: Promise[Option[Message]] = tuple._2._2
logger.info(s"Requesting new address for expectedAddrStr")
val expectedAddressStr = ConsoleCli
.exec(CliCommand.GetNewAddress(labelOpt = None), cliConfig)
.get
val expectedAddress = BitcoinAddress.fromString(expectedAddressStr)
promise.success(None)
logger.info(s"before notificationsF")
//hangs here, as the future never gets completed, fails with an exception
for {
notifications <- walletNotificationsF
_ = logger.info(s"after notificationsF")
} yield {
//assertions in here...
}
我做错了什么?
要保持客户端连接打开,您需要“更多代码”,如下所示:
val sourceKickOff = Source
.single(TextMessage("kick off msg"))
// Keeps the connection open
.concatMat(Source.maybe[Message])(Keep.right)
查看完整的工作示例,它使用来自服务器的消息:
https://github.com/pbernet/akka_streams_tutorial/blob/b6d4c89a14bdc5d72c557d8cede59985ca8e525f/src/main/scala/akkahttp/WebsocketEcho.scala#L280
问题出在这一行
Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
需要
Flow.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)
当流终止时,物化流的 Coupled
部分将确保终止 Sink 下游。
我未能具体化 Sink.seq
,到了具体化的时候我失败了,出现了这个异常
akka.stream.SubscriptionWithCancelException$StageWasCompleted$:
我正在尝试将所有从 websocket 推出的元素聚合到 Sink.seq
中。在 Sink.seq
.
val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] =
Sink.seq[WalletNotification[_]]
val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message]
.map {
case message: TextMessage.Strict =>
//we should be able to parse the address message
val text = message.text
val notification: WalletNotification[_] = {
upickle.default.read[WalletNotification[_]](text)(
WsPicklers.walletNotificationPickler)
}
logger.info(s"Notification=$notification")
notification
case msg =>
logger.error(s"msg=$msg")
sys.error("")
}
.log(s"@@@ endSink @@@")
.toMat(endSink)(Keep.right)
val f: Flow[
Message,
Message,
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = {
Flow
.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
}
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, f)
}
val walletNotificationsF: Future[Seq[WalletNotification[_]]] =
tuple._2._1
val promise: Promise[Option[Message]] = tuple._2._2
logger.info(s"Requesting new address for expectedAddrStr")
val expectedAddressStr = ConsoleCli
.exec(CliCommand.GetNewAddress(labelOpt = None), cliConfig)
.get
val expectedAddress = BitcoinAddress.fromString(expectedAddressStr)
promise.success(None)
logger.info(s"before notificationsF")
//hangs here, as the future never gets completed, fails with an exception
for {
notifications <- walletNotificationsF
_ = logger.info(s"after notificationsF")
} yield {
//assertions in here...
}
我做错了什么?
要保持客户端连接打开,您需要“更多代码”,如下所示:
val sourceKickOff = Source
.single(TextMessage("kick off msg"))
// Keeps the connection open
.concatMat(Source.maybe[Message])(Keep.right)
查看完整的工作示例,它使用来自服务器的消息: https://github.com/pbernet/akka_streams_tutorial/blob/b6d4c89a14bdc5d72c557d8cede59985ca8e525f/src/main/scala/akkahttp/WebsocketEcho.scala#L280
问题出在这一行
Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
需要
Flow.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)
当流终止时,物化流的 Coupled
部分将确保终止 Sink 下游。