Websocket - Sink.actorRefWithAck 和 Source.queue - 只处理一个 TO 服务器请求?
Websocket - Sink.actorRefWithAck and Source.queue - only one request TO server gets processed?
考虑一下
def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
log.info("Handling byte-message")
ActorFlow.actorRef
{
out => MyActor.props(out)
}
}
每当将字节消息发送到 websocket 时,它都会在我获得日志条目之前委托给参与者。
工作正常。
现在相同的逻辑,用 Flow 代替
def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
{
log.info("Handling byte-message")
Flow.fromSinkAndSource(sink, source).log("flow")
}
}
我将添加其余代码:
val tickingSource: Source[Array[Byte], Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)
val myActor = system.actorOf(Props{new MyActor(null)}, "myActor")
val serverMessageSource = Source
.queue[Array[Byte]](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}
val sink = Sink.actorRefWithAck(myActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
val source = tickingSource.merge(serverMessageSource)
它有一个 keepAlive 源和一个实际源,如果服务器想要推送一些东西,合并。
沉沦又是演员
现在的问题是,在这种情况下,我从客户端收到 EXACTLY 一条消息到服务器,即使它发送了更多消息,也不会传递给 myActor
一开始我以为这可能是因为这里把null
引用传给了myActor
,结果第一个也处理不了。我没有想法,这是什么原因造成的。流程本身有效,我收到 keepAlive 消息就好了,如果我再次刷新客户端 (Scala.js),第一个请求会很好地发送到服务器并且服务器响应,一切都很好
编辑澄清:
我不是在这里谈论日志条目 - 对不起,我在 myActor
中有另一个日志条目,让自己感到困惑。
如果客户端发送多条消息,服务器不处理。它永远不会到达演员,尽管客户肯定会发送它:(
我的期望:
1) 从客户端到服务器的第一个消息,websocket 被创建
2) websocket 由服务器通过 tickingSource
保持活动状态(这确实有效!)
3) 如果客户端发送另一个请求,它将由 myActor
处理,并且也会通过 websocket
响应客户端
因此,3) 不起作用。事实上,客户端发送了一条消息,但在初始消息之后从未到达 myActor
:(
编辑:
这是我在 myActor
中初始化 websocket/stream 的 actor 逻辑:
var tunnel: Option[SourceQueueWithComplete[Array[Byte]]] = None
override def receive: Receive = {
case i: InternalMessages.InitTunnel =>
log.info("Initializing tunnel")
tunnel = Some(i.sourceQueue)
case _: InternalMessages.Init =>
sender() ! InternalMessages.Acknowledged()
log.info("websocket stream initialized")
case _: InternalMessages.Completed =>
log.info("websocket stream completed")
case q: Question => {
tunnel match {
case Some(t) => t offer Answer()...
case None => log.error("No tunnel available")
}
}
}
object InternalMessages {
case class Acknowledged()
case class Init()
case class Completed()
case class InitTunnel(sourceQueue: SourceQueueWithComplete[Array[Byte]])
}
我觉得你在收到 Question
消息后没有发送 acks,但你应该像 akka 文档所说的那样 (http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#sink-actorrefwithack): It also requires the given acknowledgement message after each stream element to make back-pressure work.
我在 Java 中遇到了几乎相同的问题。但是消息根本没有发送到 "actorRefWithAck"(只收到了 onInitMessage)。参与者是远程的并且正在发送 "Acknowledged" 消息,该消息与 Sink.actorRefWithAck() 方法中的实例不同。向消息添加 equals 方法解决了问题。
@Override
public boolean equals(Object obj) {
return obj.getClass().equals(getClass());
}
考虑一下
def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
log.info("Handling byte-message")
ActorFlow.actorRef
{
out => MyActor.props(out)
}
}
每当将字节消息发送到 websocket 时,它都会在我获得日志条目之前委托给参与者。
工作正常。
现在相同的逻辑,用 Flow 代替
def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
request =>
{
log.info("Handling byte-message")
Flow.fromSinkAndSource(sink, source).log("flow")
}
}
我将添加其余代码:
val tickingSource: Source[Array[Byte], Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)
val myActor = system.actorOf(Props{new MyActor(null)}, "myActor")
val serverMessageSource = Source
.queue[Array[Byte]](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}
val sink = Sink.actorRefWithAck(myActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
val source = tickingSource.merge(serverMessageSource)
它有一个 keepAlive 源和一个实际源,如果服务器想要推送一些东西,合并。
沉沦又是演员
现在的问题是,在这种情况下,我从客户端收到 EXACTLY 一条消息到服务器,即使它发送了更多消息,也不会传递给 myActor
一开始我以为这可能是因为这里把null
引用传给了myActor
,结果第一个也处理不了。我没有想法,这是什么原因造成的。流程本身有效,我收到 keepAlive 消息就好了,如果我再次刷新客户端 (Scala.js),第一个请求会很好地发送到服务器并且服务器响应,一切都很好
编辑澄清:
我不是在这里谈论日志条目 - 对不起,我在 myActor
中有另一个日志条目,让自己感到困惑。
如果客户端发送多条消息,服务器不处理。它永远不会到达演员,尽管客户肯定会发送它:(
我的期望:
1) 从客户端到服务器的第一个消息,websocket 被创建
2) websocket 由服务器通过 tickingSource
保持活动状态(这确实有效!)
3) 如果客户端发送另一个请求,它将由 myActor
处理,并且也会通过 websocket
因此,3) 不起作用。事实上,客户端发送了一条消息,但在初始消息之后从未到达 myActor
:(
编辑:
这是我在 myActor
中初始化 websocket/stream 的 actor 逻辑:
var tunnel: Option[SourceQueueWithComplete[Array[Byte]]] = None
override def receive: Receive = {
case i: InternalMessages.InitTunnel =>
log.info("Initializing tunnel")
tunnel = Some(i.sourceQueue)
case _: InternalMessages.Init =>
sender() ! InternalMessages.Acknowledged()
log.info("websocket stream initialized")
case _: InternalMessages.Completed =>
log.info("websocket stream completed")
case q: Question => {
tunnel match {
case Some(t) => t offer Answer()...
case None => log.error("No tunnel available")
}
}
}
object InternalMessages {
case class Acknowledged()
case class Init()
case class Completed()
case class InitTunnel(sourceQueue: SourceQueueWithComplete[Array[Byte]])
}
我觉得你在收到 Question
消息后没有发送 acks,但你应该像 akka 文档所说的那样 (http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#sink-actorrefwithack): It also requires the given acknowledgement message after each stream element to make back-pressure work.
我在 Java 中遇到了几乎相同的问题。但是消息根本没有发送到 "actorRefWithAck"(只收到了 onInitMessage)。参与者是远程的并且正在发送 "Acknowledged" 消息,该消息与 Sink.actorRefWithAck() 方法中的实例不同。向消息添加 equals 方法解决了问题。
@Override
public boolean equals(Object obj) {
return obj.getClass().equals(getClass());
}