如何为每个用户获取 Akka Http Websocket 连接的终止开关?
How to get a kill switch per user for Akka Http Websocket connection?
我是 Akka 和 Scala 的新手,自学这个是为了用 websockets 做一个小项目。最终目标很简单,做一个基本的聊天服务器,在一些网页上发布和订阅消息。
事实上,在仔细阅读他们的文档后,我已经找到了与我的目标相关的页面,即 this and this。
使用动态连接(又名 MergeHub 和 BroadcastHub)和 Flow.fromSinkAndSource() 方法,我能够实现我想要的一个非常基本的示例。我们甚至可以使用我在下面显示的 akka 文档中的示例获得终止开关。代码如下:
private lazy val connHub: Flow[Message, Message, UniqueKillSwitch] = {
val (sink, source) = MergeHub.source[Message].toMat(BroadcastHub.sink[Message])(Keep.both).run()
Flow.fromSinkAndSourceCoupled(sink, source).joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
}
但是,我现在看到一个问题。以上将 return Akka 的 websocket 指令使用的流程:akka.http.scaladsl.server.Directives.handleWebSocketMessages(/* FLOW GOES HERE */)
这意味着只要我为它提供处理程序,akka 代码本身就会为我实现这个流程。
但假设我想通过 KillSwitch 任意终止一个用户的连接(可能是因为他们的会话在我的应用程序上已过期)。虽然用户的 websocket 将通过上述处理程序添加,但由于我的代码不会明确具体化该流程,因此我无法访问 KillSwitch。因此,我无法终止连接,只有用户离开网页时才能终止连接。
令我感到奇怪的是,文档会提到 kill switch 方法而没有说明我将如何使用 websocket api。
任何人都可以就如何获得每个连接的终止开关提出解决方案吗?我对这应该如何工作有根本的误解吗?
提前致谢。
我很高兴地说,经过大量时间、研究和编码,我对这个问题有了答案。为了做到这一点,我不得不在 Akka Gitter 和 Lightbend 论坛中 post。请参考我得到的惊人答案 there 以了解对问题的一些看法和一些解决方案。我会在这里总结一下。
为了从我使用的代码中获取 UniqueKillSwitch,我需要在我正在 returning 的 Flow 上使用 mapMaterializeValue() 方法。这是我现在用来 return 到 handleWebSocketMessages
指令的流程的代码:
// note - state will not be updated properly if cancellation events come through from the client side as user->killswitch mapping may still remain in concurrent map even if the connections are closed
Flow.fromSinkAndSourceCoupled(mergeHubSink, broadcastHubSource)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.mapMaterializedValue { killSwitch =>
connections.put(user, killSwitch) // add kill switch in side effect once value is ready from materialization
NotUsed.notUsed()
}
以上代码存在于我创建的聊天室 class 中,它可以访问 mergehub 和广播中心物化接收器和源。它还可以访问并发哈希图,该哈希图将终止开关持久保存给用户。这样,我们现在可以通过地图查询来访问 Kill Switch。从那里,您可以调用 switch.shutdown() 从服务器端终止用户的连接。
我的主要问题是,我原本以为我可以直接获得开关,即使我没有控制实体化。这似乎不可能。当您知道需要您的 Flow 的调用者不关心物化值(也称为终止开关)时,我建议使用此方法。
请参考我链接的答案,了解更多场景和处理此问题的方法。
我是 Akka 和 Scala 的新手,自学这个是为了用 websockets 做一个小项目。最终目标很简单,做一个基本的聊天服务器,在一些网页上发布和订阅消息。
事实上,在仔细阅读他们的文档后,我已经找到了与我的目标相关的页面,即 this and this。
使用动态连接(又名 MergeHub 和 BroadcastHub)和 Flow.fromSinkAndSource() 方法,我能够实现我想要的一个非常基本的示例。我们甚至可以使用我在下面显示的 akka 文档中的示例获得终止开关。代码如下:
private lazy val connHub: Flow[Message, Message, UniqueKillSwitch] = {
val (sink, source) = MergeHub.source[Message].toMat(BroadcastHub.sink[Message])(Keep.both).run()
Flow.fromSinkAndSourceCoupled(sink, source).joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
}
但是,我现在看到一个问题。以上将 return Akka 的 websocket 指令使用的流程:akka.http.scaladsl.server.Directives.handleWebSocketMessages(/* FLOW GOES HERE */)
这意味着只要我为它提供处理程序,akka 代码本身就会为我实现这个流程。
但假设我想通过 KillSwitch 任意终止一个用户的连接(可能是因为他们的会话在我的应用程序上已过期)。虽然用户的 websocket 将通过上述处理程序添加,但由于我的代码不会明确具体化该流程,因此我无法访问 KillSwitch。因此,我无法终止连接,只有用户离开网页时才能终止连接。
令我感到奇怪的是,文档会提到 kill switch 方法而没有说明我将如何使用 websocket api。
任何人都可以就如何获得每个连接的终止开关提出解决方案吗?我对这应该如何工作有根本的误解吗?
提前致谢。
我很高兴地说,经过大量时间、研究和编码,我对这个问题有了答案。为了做到这一点,我不得不在 Akka Gitter 和 Lightbend 论坛中 post。请参考我得到的惊人答案 there 以了解对问题的一些看法和一些解决方案。我会在这里总结一下。
为了从我使用的代码中获取 UniqueKillSwitch,我需要在我正在 returning 的 Flow 上使用 mapMaterializeValue() 方法。这是我现在用来 return 到 handleWebSocketMessages
指令的流程的代码:
// note - state will not be updated properly if cancellation events come through from the client side as user->killswitch mapping may still remain in concurrent map even if the connections are closed
Flow.fromSinkAndSourceCoupled(mergeHubSink, broadcastHubSource)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.mapMaterializedValue { killSwitch =>
connections.put(user, killSwitch) // add kill switch in side effect once value is ready from materialization
NotUsed.notUsed()
}
以上代码存在于我创建的聊天室 class 中,它可以访问 mergehub 和广播中心物化接收器和源。它还可以访问并发哈希图,该哈希图将终止开关持久保存给用户。这样,我们现在可以通过地图查询来访问 Kill Switch。从那里,您可以调用 switch.shutdown() 从服务器端终止用户的连接。
我的主要问题是,我原本以为我可以直接获得开关,即使我没有控制实体化。这似乎不可能。当您知道需要您的 Flow 的调用者不关心物化值(也称为终止开关)时,我建议使用此方法。
请参考我链接的答案,了解更多场景和处理此问题的方法。