我如何创建一个只使用 akka 流消费消息的 TCP 接收器?

How do i create a TCP receiver that only consumes messages using akka streams?

我们在:akka-stream-experimental_2.11 1.0.

灵感来自 example

我们写了一个TCP接收器如下:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
      val serverFlow = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(message => {
        target ? new Message(message); ByteString.empty
      })
      conn handleWith serverFlow
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

但是,我们的意图是让接收方根本不响应,只接收消息。 (TCP 消息发布者不关心响应)。

这可能吗?完全不响应,因为 akka.stream.scaladsl.Tcp.IncomingConnection 采用类型流:Flow[ByteString, ByteString, Unit]

如果是,一些指导将不胜感激。提前致谢。

下面的一次尝试通过了我的单元测试,但不确定它是否是最好的主意:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>

      val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))

      val targetSink = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(Message(_))
        .to(Sink(targetSubscriber))

      conn.flow.to(targetSink).runWith(Source(Promise().future))
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

你走在正确的轨道上。为了保持在某个时候关闭连接的可能性,您可能希望遵守承诺并在以后完成它。一旦完成了一个元素,这个元素就由源发布。但是,由于您不希望在连接上发布 any 元素,您可以使用 drop(1) 来确保源永远不会发出任何元素。

这是您示例的更新版本(未经测试):

val promise = Promise[ByteString]()
// this source will complete when the promise is fulfilled
// or it will complete with an error if the promise is completed with an error
val completionSource = Source(promise.future).drop(1)

completionSource  // only used to complete later
  .via(conn.flow) // I reordered the flow for better readability (arguably)
  .runWith(targetSink)

// to close the connection later complete the promise:
def closeConnection() = promise.success(ByteString.empty) // dummy element, will be dropped

// alternatively to fail the connection later, complete with an error
def failConnection() = promise.failure(new RuntimeException)