Akka Streams TCP 套接字客户端终止

Akka Streams TCP socket client side termination

我有以下流程:

val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)

val targetSink = Flow[ByteString]
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, "Done"))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))

Actor(即 Sink.actorRef 中)。 conn.flow 是使用 Tcp().bind(address, port) 的传入 TCP 连接。

当前,当从客户端关闭 tcp 连接时,Sink.actorRef Actor 保持 运行。有没有办法注册客户端终止 tcp 连接以关闭 Actor?

编辑: 我尝试按照建议处理这两种情况:

case "Done" =>
  context.stop(self)

case akka.actor.Status.Failure =>
  context.stop(self)

但是当我用套接字客户端测试并取消它时,actor 并没有被关闭。因此,如果 TCP 连接终止,"Done" 消息和失败似乎都不会被注册。

完整代码如下:

private var connection: Option[Tcp.IncomingConnection] = None
private var mqttpubsub: Option[ActorRef] = None
private var sourceRef: Option[ActorRef] = None

private val sdcTopic = "out"
private val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)

implicit private val system = context.system
implicit private val mat = ActorMaterializer.create(context.system)

override def receive: Receive = {

case conn: Tcp.IncomingConnection =>
  connection = Some(conn)

  mqttpubsub = Some(context.actorOf(Props(classOf[MqttPubSub], PSConfig(
    brokerUrl = "tcp://127.0.0.1:1883", //all params is optional except brokerUrl
    userName = null,
    password = null,
    //messages received when disconnected will be stash. Messages isOverdue after stashTimeToLive will be discard
    stashTimeToLive = 1.minute,
    stashCapacity = 100000, //stash messages will be drop first haft elems when reach this size
    reconnectDelayMin = 10.millis, //for fine tuning re-connection logic
    reconnectDelayMax = 30.seconds
  ))))

  val targetSink = Flow[ByteString]
    .alsoTo(Sink.foreach(println))
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, "Done"))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))

case msg: MqttMessages.MqttMessage =>
  processMessage(msg)

case msg: Message =>
  val jsonMsg = JsonParser(msg.payload).asJsObject
  val mqttMsg = MqttMessages.jsonToObject(jsonMsg)

  try {
    sourceRef.foreach(_ ! ByteString(msg.payload))
  } catch {
    case e: Throwable => e.printStackTrace()
  }


case SubscribeAck(Subscribe(topic, self, qos), fail) =>

case "Done" =>
  context.stop(self)

case akka.actor.Status.Failure =>
  context.stop(self)
}

the Actor keeps running

您指的是哪个演员,您注册的 Sink.actorRef?如果是,那么要在流关闭时将其关闭,您需要处理其中的 "Done"akka.actor.Status.Failure 消息并显式调用 context.stop(self)"Done" 流关闭成功时将发送消息,而如果有错误将发送 Status.Failure

有关详细信息,请参阅 Sink.actorRef API 文档,它们解释了终止语义。

我最终创建了另一个阶段,它只传递元素,但如果上游关闭,它会向下一个流发出额外的消息:

class TcpStage extends GraphStage[FlowShape[ByteString, ByteString]] {

   val in = Inlet[ByteString]("TCPStage.in")
   val out = Outlet[ByteString]("TCPStage.out")
   override val shape = FlowShape.of(in, out)

   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

     setHandler(out, new OutHandler {
       override def onPull(): Unit = {
         if (isClosed(in)) emitDone()
         else pull(in)
       }
     })
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        push(out, grab(in))
      }

      override def onUpstreamFinish(): Unit = {
        emitDone()
        completeStage()
      }
    })

    private def emitDone(): Unit = {
      push(out, ByteString("{ }".getBytes("utf-8")))
    }
  }
}

然后我在我的流程中使用:

  val targetSink = Flow[ByteString]
    .via(new TcpStage())
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, MqttDone))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))