为网络套接字创建流时如何连接输入和输出?

How to wire input and output when creating flow for web socket?

假设我有一个 actor UserActor 知道如何处理传入消息以及如何发送新消息,我想在 Akka-Http 中处理网络套接字,所以我创建了 Flow[Message, Message, NotUsed]

这里我们获取新消息 JSON 并将它们发送到 UserActor。源完成后,我收到 SourceDied 消息:

    val incomingMessages: Sink[Message, NotUsed] =
      Flow[Message]
        .mapAsync(1) {
          case TextMessage.Strict(text)  => Future.successful(text)
          case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
        }
        .map(decode[IncomingMessage])
        .collect { case Right(msg) => msg }
        .map(_.toMessage)
        .to(Sink.actorRef[ChatMessage](userActor, SourceDied))

在这里,我为我的 UserActor 注册 out,它将在此处发送消息:

    val outgoingMessages: Source[Message, NotUsed] =
      Source
        .actorRef[ChatMessage](20, OverflowStrategy.fail)
        .mapMaterializedValue { outActor =>
          userActor ! Connect(outActor)
          NotUsed
        }
        .map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
        .map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))

    Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

但是,UserActor 是每个用户一个,每个用户可以同时打开多个套接字。所以我只是收集 outs 以设置在 UserActor 中并向每个人发送信息。效果不错。

但是当源向我发送终止消息时(SourceDied 在我的例子中),我不知道 out 这个 source 被分配给了哪个 - 我不能决定我应该通知哪个 out 完成,然后从我的 outs 集合中删除。

一个想法是更改您的 Flow 为每个连接采用唯一标识符:

def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
  val incomingMessages: Sink[Message, NotUsed] =
    ...
    .to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))

  val outgoingMessages: Source[Message, NotUsed] =
    Source
      .actorRef[ChatMessage](20, OverflowStrategy.fail)
      .mapMaterializedValue { outActor =>
        userActor ! Connect(connectionId, outActor)
        NotUsed
      }
      ...

  Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}

显然,您需要调整 SourceDiedConnect 消息以包含连接 ID(在本例中可以使用 java.util.UUID.randomUUID.toString 之类的内容生成)。然后在 UserActor 中,将 Set 替换为 Map,其键是连接 ID。使用 Map 将使您能够查找连接参与者并根据需要删除它们。

因此,正如我从@chunjef 的回答中了解到的那样,没有直接的方法可以做到这一点。我个人决定不使用随机 ID 生成,而是在套接字和 UserActor.

之间再使用一个 Actor

基本上,现在我有 SocketHandlerActor(userActor: ActorRef) 来替换套接字创建部分中的 UserActor。它只是连接到 UserActor 并在套接字和 UserActor.

之间发送所有消息

但是当 SocketHandlerActor 收到 SourceDied 消息时,它只是做一些事情然后用 PoisionPill 杀死 out 和自己。 UserActor 一收到 Termination 消息就将其从自己的 outs 列表中删除。