Websocket - Sink.actorRefWithAck 和 Source.queue - 只处理一个 TO 服务器请求?

Websocket - Sink.actorRefWithAck and Source.queue - only one request TO server gets processed?

考虑一下

 def handle = WebSocket.accept[Array[Byte], Array[Byte]]
 {
     request =>
        log.info("Handling byte-message")
        ActorFlow.actorRef
        {
          out => MyActor.props(out)
        }
  }

每当将字节消息发送到 websocket 时,它都会在我获得日志条目之前委托给参与者。

工作正常。

现在相同的逻辑,用 Flow 代替

def handle = WebSocket.accept[Array[Byte], Array[Byte]]
{
     request =>
      {
        log.info("Handling byte-message")
        Flow.fromSinkAndSource(sink, source).log("flow")
      }
}

我将添加其余代码:

 val tickingSource: Source[Array[Byte], Cancellable] =
    Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
      .map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)

  val myActor = system.actorOf(Props{new MyActor(null)}, "myActor")

  val serverMessageSource = Source
    .queue[Array[Byte]](10, OverflowStrategy.backpressure)
    .mapMaterializedValue { queue => myActor ! InitTunnel(queue)}

  val sink = Sink.actorRefWithAck(myActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())

  val source = tickingSource.merge(serverMessageSource)

它有一个 keepAlive 源和一个实际源,如果服务器想要推送一些东西,合并。

沉沦又是演员

现在的问题是,在这种情况下,我从客户端收到 EXACTLY 一条消息到服务器,即使它发送了更多消息,也不会传递给 myActor

一开始我以为这可能是因为这里把null引用传给了myActor,结果第一个也处理不了。我没有想法,这是什么原因造成的。流程本身有效,我收到 keepAlive 消息就好了,如果我再次刷新客户端 (Scala.js),第一个请求会很好地发送到服务器并且服务器响应,一切都很好

编辑澄清:

不是在这里谈论日志条目 - 对不起,我在 myActor 中有另一个日志条目,让自己感到困惑。

如果客户端发送多条消息,服务器不处理。它永远不会到达演员,尽管客户肯定会发送它:(

我的期望:

1) 从客户端到服务器的第一个消息,websocket 被创建 2) websocket 由服务器通过 tickingSource 保持活动状态(这确实有效!) 3) 如果客户端发送另一个请求,它将由 myActor 处理,并且也会通过 websocket

响应客户端

因此,3) 不起作用。事实上,客户端发送了一条消息,但在初始消息之后从未到达 myActor :(

编辑:

这是我在 myActor 中初始化 websocket/stream 的 actor 逻辑:

var tunnel: Option[SourceQueueWithComplete[Array[Byte]]] = None

override def receive: Receive = {
   case i: InternalMessages.InitTunnel =>
      log.info("Initializing tunnel")
      tunnel = Some(i.sourceQueue)

    case _: InternalMessages.Init =>
      sender() ! InternalMessages.Acknowledged()
      log.info("websocket stream initialized")

    case _: InternalMessages.Completed =>
      log.info("websocket stream completed")

    case q: Question => {
        tunnel match {
           case Some(t) => t offer Answer()...
           case None => log.error("No tunnel available")
        }
    }
}


object InternalMessages {
  case class Acknowledged()
  case class Init()
  case class Completed()
  case class InitTunnel(sourceQueue: SourceQueueWithComplete[Array[Byte]])
}

我觉得你在收到 Question 消息后没有发送 acks,但你应该像 akka 文档所说的那样 (http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#sink-actorrefwithack): It also requires the given acknowledgement message after each stream element to make back-pressure work.

我在 Java 中遇到了几乎相同的问题。但是消息根本没有发送到 "actorRefWithAck"(只收到了 onInitMessage)。参与者是远程的并且正在发送 "Acknowledged" 消息,该消息与 Sink.actorRefWithAck() 方法中的实例不同。向消息添加 equals 方法解决了问题。

@Override
public boolean equals(Object obj) {
    return obj.getClass().equals(getClass());
}