阿卡。如何关闭来自服务器的 websocket 连接?

Akka. How to close a websocket connection from server?

我正在学习 akka 框架,我正在尝试关闭来自服务器的 websocket 连接。

fun main() {

    val system = ActorSystem.create("system")
    val materializer = Materializer.createMaterializer(system)

    val http = Http.get(system)

    val routeFlow = path("ws") {
        get {
            parameter("as") { sys ->
                parameter("id") { id ->
                    parameter("v") { v ->
                        handleWebSocketMessages(mainFlow(sys, id, v))
                    }
                }
            }
        }
    }.flow(system, materializer)

    http.newServerAt("0.0.0.0", 8080).withMaterializer(materializer).bindFlow(routeFlow).thenRun {
        println("Web socket server is running at localhost:8080")
    }
}

fun mainFlow(sys: String, id: String, v: String): Flow<Message, Message, NotUsed> {
    val source = Source.actorRef<Message>(
        { Optional.empty() },
        { Optional.empty() },
        100,
        OverflowStrategy.fail())
        .mapMaterializedValue {
            it.tell(PoisonPill.getInstance(), ActorRef.noSender())
        }

    val sink = Flow.create<Message>()
        .map {
            
        }
        .to(Sink.onComplete {
            
        })

    return Flow.fromSinkAndSource(sink, source)
}

我在这里做:

.mapMaterializedValue {
    it.tell(PoisonPill.getInstance(), ActorRef.noSender())
}

我检查了这个 。但它不起作用。我收到一个错误:

PoisonPill message sent to StageActor(akka://system/system/Materializers/StreamSupervisor-1/$$c-actorRefSource) will be ignored, since it is not a real Actor.Use a custom message type to communicate with it instead.

我做错了什么以及如何关闭它?

解决方案:

fun mainFlow(connectionsController: ActorRef, sys: String, id: String, v: String, system: ActorSystem): Flow<Message, Message, NotUsed> {
    val source = Source.actorRef<Message>(
        {
            if (it is Done) {
                Optional.of(CompletionStrategy.immediately())
            } else {
                Optional.empty()
            }
        },
        { Optional.empty() },
        16,
        OverflowStrategy.fail())
        .mapMaterializedValue {
            it.tell(Done.done(), ActorRef.noSender())
        }

    val sink = Flow.create<Message>()
        .map {
            
        }
        .to(Sink.onComplete {
            
        })

    return Flow.fromSinkAndSource(sink, source)
}

服务器将在其上游完成时关闭连接。

之前的答案依赖于旧版本在 PoisonPill 附近的行为,这会触发完成。

对于 Source.actorRef,第一个函数参数是完成匹配器:发送一条消息,该函数 returns 为非空 Optional 并且源将完成。 Optional 的内容是完成策略,它控制源是立即完成流还是耗尽其缓冲区。

我对 Kotlin 不太熟悉,所以我无法提供任何代码,但我认为 mapMaterializedValue 不是您要关闭连接的地方,因为该代码将在源代码之前执行有机会通过 websocket 发送任何消息。