使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)

Consume TCP stream and redirect it to another Sink (with Akka Streams)

我尝试 redirect/forward 使用 Akka 2.4.3 将 TCP 流传输到另一个接收器。 该程序应该打开一个服务器套接字,监听传入的连接,然后使用 tcp 流。我们的发件人没有 expect/accept 回复我们所以我们永远不会发回任何东西 - 我们只是消费流。在构造 tcp 流之后,我们需要将字节转换为更有用的东西并将其发送到 Sink。

到目前为止我尝试了以下方法,但我特别纠结于如何不将 tcp 数据包发送回发送方以及如何正确连接接收器的部分。

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

更新:将此添加到您的 build.sbt 文件以获得必要的 deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

handleWith 需要一个 Flow,即一个带有未连接的入口和未连接的出口的盒子。您实际上提供了 Source,因为您使用 to 操作将 FlowSink 连接起来。

我认为您可以执行以下操作:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)

另一种(在我看来更清晰)编码方式 (AKKA 2.6.x),这也将强调一个事实,即您不进行任何出站流量,将是:

val receivingPipeline = Flow
  .via(framing)
  .via(decoder)
  .to(mySink)

val sendingNothing = Source.never[ByteString]()

conn.handleWith(Flow.fromSinkAndSourceCoupled(receivingPiline, sendingNothing))