Akka Stream 是一个自定义图形阶段

akka stream custom graph stage

我有一个来自网络套接字的 akka 流,例如 ,我想构建一个可重用的图形阶段(inlet:流,FlowShape:添加一个额外的字段到 JSON 指定来源,即

{
...,
"origin":"blockchain.info"
}

outlet 到卡夫卡。

我面临以下3个问题:

示例项目(仅限流程)如下所示:

import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val incoming: Sink[Message, Future[Done]] =
    Flow[Message].mapAsync(4) {
      case message: TextMessage.Strict =>
        println(message.text)
        Future.successful(Done)
      case message: TextMessage.Streamed =>
        message.textStream.runForeach(println)
      case message: BinaryMessage =>
        message.dataStream.runWith(Sink.ignore)
    }.toMat(Sink.last)(Keep.right)

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))

val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .toMat(incoming)(Keep.both)
      // TODO not working integrating kafka here
      // .map(_.toString)
      //    .map { elem =>
      //      println(s"PlainSinkProducer produce: ${elem}")
      //      new ProducerRecord[Array[Byte], String]("topic1", elem)
      //    }
      //    .runWith(Producer.plainSink(producerSettings))
      .run()

val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      system.terminate
    }
  }

// kafka that works / writes dummy data
val done1 = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

一个问题是围绕 incoming 阶段,它被建模为 Sink。它应该被建模为 Flow。随后将消息馈送到 Kafka。

因为传入的短信可以是 Streamed。您可以按如下方式使用 flatMapMerge 组合器来避免将整个(可能很大的)消息存储在内存中的需要:

  val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
    case msg: BinaryMessage =>
      msg.dataStream.runWith(Sink.ignore)
      Future.successful(None)
    case TextMessage.Streamed(src) =>
      src.runFold("")(_ + _).map { msg => Some(msg) }
  }.collect {
    case Some(msg) => msg
  }

此时你得到了一些可以产生字符串的东西,并且可以连接到 Kafka:

  val addOrigin: Flow[String, String, NotUsed] = ???

  val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .via(incoming)
      .via(addOrigin)
      .map { elem =>
        println(s"PlainSinkProducer produce: ${elem}")
        new ProducerRecord[Array[Byte], String]("topic1", elem)
      }
      .toMat(Producer.plainSink(producerSettings))(Keep.both)
      .run()