如何使用 Source.asSubscriber 来包装反应式监听器?
How do I use Source.asSubscriber to wrap reactive listener?
如何使用 Source.asSubscriber
包装响应式监听器?我不明白它的好处。
我正在尝试为 asynchttpclient WebSocket 创建 Source[T]
。这是我的代码:
def createWsObservable(url: String, onStartAction: Option[WebSocket ⇒ Unit]): Source[WsMessage, KillSwitch] =
Source.asSubscriber[WsMessage].mapMaterializedValue { subs: Subscriber[WsMessage] ⇒
val listener: WebSocketListener = new WebSocketListener() {
override def onOpen(ws: WebSocket): Unit =
subs.onNext(WsOpen(ws))
override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
subs.onComplete()
override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onError(t: Throwable): Unit =
subs.onError(t)
override def onPongFrame(payload: Array[Byte]): Unit = {
super.onPingFrame(payload)
}
}
val websocket =
asyncHttpClient
.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
new KillSwitch {
override def shutdown(): Unit = websocket.sendCloseFrame()
override def abort(ex: Throwable): Unit = websocket.sendCloseFrame()
}
}
在第一个事件中我得到异常:
java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand
at akka.stream.impl.VirtualProcessor.rec(StreamLayout.scala:239)
at akka.stream.impl.VirtualProcessor.onNext(StreamLayout.scala:243)
at ingestion.NettyClientWrapper$$anon.onOpen(NettyClientWrapper.scala:55)
也许 Source.asSubscriber
对我来说不是个好选择?我应该怎么做才能将 reactivestreams Subscriber 包装到 akka 的 Source 中?
如果您有一个 Publisher
想要公开为 Source
,您可以使用 Source.fromPublisher
。
对于您的情况,第一个问题是如何从 AHC 获得 Publisher
。我不知道 AHC,但从我正在阅读的内容来看,您是否可以同时拥有 Reactive Streams 支持(这需要您将 StreamedAsyncHandler
传递给 execute
方法)和 WebSocket 支持(这需要 WebSocketUpgradeHandler
代替)。希望能把它们组合起来
我最终使用 Source.actorRef
和 KillSwitch
def createWsObservable(url: String, onStartAction: WebSocket ⇒ Option[KillSwitch] = _ ⇒ None, bufferSize: Int = 32): Source[WsMessage, KillSwitch] = {
val actorSource = Source.actorRef[WsMessage](bufferSize, OverflowStrategy.fail)
actorSource.mapMaterializedValue { actor ⇒
val listener: WebSocketListener = new WebSocketListener() {
override def onOpen(ws: WebSocket): Unit =
actor ! ...
override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
actor ! akka.actor.Status.Success(code)
override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
actor ! ...
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
actor ! ...
override def onError(t: Throwable): Unit =
actor ! akka.actor.Status.Failure(t)
}
val websocket =
asyncHttpClient
.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
// Use Pong reply as indication that we've connected to the server
val p = Promise[Void]()
websocket.sendPingFrame().addListener(p)
val onStartKillSwitchFut = p.future.map(_ ⇒ onStartAction(websocket))
new KillSwitch {
override def shutdown(): Unit = {
onStartKillSwitchFut.map(_.foreach(_.shutdown()))
websocket.sendCloseFrame()
}
override def abort(ex: Throwable): Unit = {
onStartKillSwitchFut.map(_.foreach(_.abort(ex)))
websocket.sendCloseFrame()
}
}
}
}
如何使用 Source.asSubscriber
包装响应式监听器?我不明白它的好处。
我正在尝试为 asynchttpclient WebSocket 创建 Source[T]
。这是我的代码:
def createWsObservable(url: String, onStartAction: Option[WebSocket ⇒ Unit]): Source[WsMessage, KillSwitch] =
Source.asSubscriber[WsMessage].mapMaterializedValue { subs: Subscriber[WsMessage] ⇒
val listener: WebSocketListener = new WebSocketListener() {
override def onOpen(ws: WebSocket): Unit =
subs.onNext(WsOpen(ws))
override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
subs.onComplete()
override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onError(t: Throwable): Unit =
subs.onError(t)
override def onPongFrame(payload: Array[Byte]): Unit = {
super.onPingFrame(payload)
}
}
val websocket =
asyncHttpClient
.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
new KillSwitch {
override def shutdown(): Unit = websocket.sendCloseFrame()
override def abort(ex: Throwable): Unit = websocket.sendCloseFrame()
}
}
在第一个事件中我得到异常:
java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand
at akka.stream.impl.VirtualProcessor.rec(StreamLayout.scala:239)
at akka.stream.impl.VirtualProcessor.onNext(StreamLayout.scala:243)
at ingestion.NettyClientWrapper$$anon.onOpen(NettyClientWrapper.scala:55)
也许 Source.asSubscriber
对我来说不是个好选择?我应该怎么做才能将 reactivestreams Subscriber 包装到 akka 的 Source 中?
如果您有一个 Publisher
想要公开为 Source
,您可以使用 Source.fromPublisher
。
对于您的情况,第一个问题是如何从 AHC 获得 Publisher
。我不知道 AHC,但从我正在阅读的内容来看,您是否可以同时拥有 Reactive Streams 支持(这需要您将 StreamedAsyncHandler
传递给 execute
方法)和 WebSocket 支持(这需要 WebSocketUpgradeHandler
代替)。希望能把它们组合起来
我最终使用 Source.actorRef
和 KillSwitch
def createWsObservable(url: String, onStartAction: WebSocket ⇒ Option[KillSwitch] = _ ⇒ None, bufferSize: Int = 32): Source[WsMessage, KillSwitch] = {
val actorSource = Source.actorRef[WsMessage](bufferSize, OverflowStrategy.fail)
actorSource.mapMaterializedValue { actor ⇒
val listener: WebSocketListener = new WebSocketListener() {
override def onOpen(ws: WebSocket): Unit =
actor ! ...
override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
actor ! akka.actor.Status.Success(code)
override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
actor ! ...
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
actor ! ...
override def onError(t: Throwable): Unit =
actor ! akka.actor.Status.Failure(t)
}
val websocket =
asyncHttpClient
.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
// Use Pong reply as indication that we've connected to the server
val p = Promise[Void]()
websocket.sendPingFrame().addListener(p)
val onStartKillSwitchFut = p.future.map(_ ⇒ onStartAction(websocket))
new KillSwitch {
override def shutdown(): Unit = {
onStartKillSwitchFut.map(_.foreach(_.shutdown()))
websocket.sendCloseFrame()
}
override def abort(ex: Throwable): Unit = {
onStartKillSwitchFut.map(_.foreach(_.abort(ex)))
websocket.sendCloseFrame()
}
}
}
}