为网络套接字创建流时如何连接输入和输出?
How to wire input and output when creating flow for web socket?
假设我有一个 actor UserActor
知道如何处理传入消息以及如何发送新消息,我想在 Akka-Http 中处理网络套接字,所以我创建了 Flow[Message, Message, NotUsed]
。
这里我们获取新消息 JSON 并将它们发送到 UserActor
。源完成后,我收到 SourceDied
消息:
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
}
.map(decode[IncomingMessage])
.collect { case Right(msg) => msg }
.map(_.toMessage)
.to(Sink.actorRef[ChatMessage](userActor, SourceDied))
在这里,我为我的 UserActor
注册 out
,它将在此处发送消息:
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(outActor)
NotUsed
}
.map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
.map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
但是,UserActor 是每个用户一个,每个用户可以同时打开多个套接字。所以我只是收集 outs
以设置在 UserActor
中并向每个人发送信息。效果不错。
但是当源向我发送终止消息时(SourceDied
在我的例子中),我不知道 out
这个 source
被分配给了哪个 - 我不能决定我应该通知哪个 out
完成,然后从我的 outs
集合中删除。
一个想法是更改您的 Flow
为每个连接采用唯一标识符:
def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
val incomingMessages: Sink[Message, NotUsed] =
...
.to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(connectionId, outActor)
NotUsed
}
...
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
显然,您需要调整 SourceDied
和 Connect
消息以包含连接 ID(在本例中可以使用 java.util.UUID.randomUUID.toString
之类的内容生成)。然后在 UserActor
中,将 Set
替换为 Map
,其键是连接 ID。使用 Map
将使您能够查找连接参与者并根据需要删除它们。
因此,正如我从@chunjef 的回答中了解到的那样,没有直接的方法可以做到这一点。我个人决定不使用随机 ID 生成,而是在套接字和 UserActor
.
之间再使用一个 Actor
基本上,现在我有 SocketHandlerActor(userActor: ActorRef)
来替换套接字创建部分中的 UserActor
。它只是连接到 UserActor
并在套接字和 UserActor
.
之间发送所有消息
但是当 SocketHandlerActor
收到 SourceDied
消息时,它只是做一些事情然后用 PoisionPill
杀死 out
和自己。 UserActor
一收到 Termination
消息就将其从自己的 outs
列表中删除。
假设我有一个 actor UserActor
知道如何处理传入消息以及如何发送新消息,我想在 Akka-Http 中处理网络套接字,所以我创建了 Flow[Message, Message, NotUsed]
。
这里我们获取新消息 JSON 并将它们发送到 UserActor
。源完成后,我收到 SourceDied
消息:
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
}
.map(decode[IncomingMessage])
.collect { case Right(msg) => msg }
.map(_.toMessage)
.to(Sink.actorRef[ChatMessage](userActor, SourceDied))
在这里,我为我的 UserActor
注册 out
,它将在此处发送消息:
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(outActor)
NotUsed
}
.map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
.map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
但是,UserActor 是每个用户一个,每个用户可以同时打开多个套接字。所以我只是收集 outs
以设置在 UserActor
中并向每个人发送信息。效果不错。
但是当源向我发送终止消息时(SourceDied
在我的例子中),我不知道 out
这个 source
被分配给了哪个 - 我不能决定我应该通知哪个 out
完成,然后从我的 outs
集合中删除。
一个想法是更改您的 Flow
为每个连接采用唯一标识符:
def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
val incomingMessages: Sink[Message, NotUsed] =
...
.to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(connectionId, outActor)
NotUsed
}
...
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
显然,您需要调整 SourceDied
和 Connect
消息以包含连接 ID(在本例中可以使用 java.util.UUID.randomUUID.toString
之类的内容生成)。然后在 UserActor
中,将 Set
替换为 Map
,其键是连接 ID。使用 Map
将使您能够查找连接参与者并根据需要删除它们。
因此,正如我从@chunjef 的回答中了解到的那样,没有直接的方法可以做到这一点。我个人决定不使用随机 ID 生成,而是在套接字和 UserActor
.
基本上,现在我有 SocketHandlerActor(userActor: ActorRef)
来替换套接字创建部分中的 UserActor
。它只是连接到 UserActor
并在套接字和 UserActor
.
但是当 SocketHandlerActor
收到 SourceDied
消息时,它只是做一些事情然后用 PoisionPill
杀死 out
和自己。 UserActor
一收到 Termination
消息就将其从自己的 outs
列表中删除。