阿卡。如何关闭来自服务器的 websocket 连接?
Akka. How to close a websocket connection from server?
我正在学习 akka 框架,我正在尝试关闭来自服务器的 websocket 连接。
fun main() {
val system = ActorSystem.create("system")
val materializer = Materializer.createMaterializer(system)
val http = Http.get(system)
val routeFlow = path("ws") {
get {
parameter("as") { sys ->
parameter("id") { id ->
parameter("v") { v ->
handleWebSocketMessages(mainFlow(sys, id, v))
}
}
}
}
}.flow(system, materializer)
http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
println("Web socket server is running at localhost:8080")
}
}
fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{ Optional.empty() },
{ Optional.empty() },
100,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
我在这里做:
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
我检查了这个 。但它不起作用。我收到一个错误:
PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.
我做错了什么以及如何关闭它?
解决方案:
fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{
if (it is Done) {
Optional.of(CompletionStrategy.immediately())
} else {
Optional.empty()
}
},
{ Optional.empty() },
16,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(Done.done(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
服务器将在其上游完成时关闭连接。
之前的答案依赖于旧版本在 PoisonPill
附近的行为,这会触发完成。
对于 Source.actorRef
,第一个函数参数是完成匹配器:发送一条消息,该函数 returns 为非空 Optional
并且源将完成。 Optional
的内容是完成策略,它控制源是立即完成流还是耗尽其缓冲区。
我对 Kotlin 不太熟悉,所以我无法提供任何代码,但我认为 mapMaterializedValue
不是您要关闭连接的地方,因为该代码将在源代码之前执行有机会通过 websocket 发送任何消息。
我正在学习 akka 框架,我正在尝试关闭来自服务器的 websocket 连接。
fun main() {
val system = ActorSystem.create("system")
val materializer = Materializer.createMaterializer(system)
val http = Http.get(system)
val routeFlow = path("ws") {
get {
parameter("as") { sys ->
parameter("id") { id ->
parameter("v") { v ->
handleWebSocketMessages(mainFlow(sys, id, v))
}
}
}
}
}.flow(system, materializer)
http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
println("Web socket server is running at localhost:8080")
}
}
fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{ Optional.empty() },
{ Optional.empty() },
100,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
我在这里做:
.mapMaterializedValue {
it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}
我检查了这个
PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.
我做错了什么以及如何关闭它?
解决方案:
fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
val source = Source.actorRef<Message>(
{
if (it is Done) {
Optional.of(CompletionStrategy.immediately())
} else {
Optional.empty()
}
},
{ Optional.empty() },
16,
OverflowStrategy.fail())
.mapMaterializedValue {
it.tell(Done.done(), ActorRef.noSender())
}
val sink = Flow.create<Message>()
.map {
}
.to(Sink.onComplete {
})
return Flow.fromSinkAndSource(sink, source)
}
服务器将在其上游完成时关闭连接。
之前的答案依赖于旧版本在 PoisonPill
附近的行为,这会触发完成。
对于 Source.actorRef
,第一个函数参数是完成匹配器:发送一条消息,该函数 returns 为非空 Optional
并且源将完成。 Optional
的内容是完成策略,它控制源是立即完成流还是耗尽其缓冲区。
我对 Kotlin 不太熟悉,所以我无法提供任何代码,但我认为 mapMaterializedValue
不是您要关闭连接的地方,因为该代码将在源代码之前执行有机会通过 websocket 发送任何消息。