Akka Stream 持续消费 websocket

Akka Stream continuously consume websocket

我对 Scala 和 Akka Stream 有点陌生,我正在尝试从 websocket 获取 JSON 字符串消息并将它们推送到 Kafka 主题。

现在我只处理“从 ws 获取消息”部分。

来自 websocket 的消息如下所示:

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

我想将此 JSON 消息拆分为多条消息:

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

然后将这些消息中的每一个推送到 kafka 主题。

这是我到目前为止取得的成就:

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

它正在工作,我得到了预期的输出 Json 消息,但我想知道我是否可以用更“Akka-ish”的风格编写这个制作人,比如使用 GraphDSL。所以我有几个问题:

感谢阅读我,问候, 阿雷斯

该代码很像 Akka:scaladslGraphDSL 或实现自定义 GraphStage 一样是 Akka。 IMO/E 转到 GraphDSL 的唯一原因是如果图形的实际形状在 scaladsl.

中不容易表达

我个人会采用定义 CoinPrice class 的方式来使模型显式化

case class CoinPrice(coin: String, price: BigDecimal)

然后有一个 Flow[Message, CoinPrice, NotUsed] 将 1 条传入消息解析为零个或多个 CoinPrice。像这样的东西(在这里使用 Play JSON):

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

根据消息中 JSON 的大小,您可能希望将其分成不同的流阶段,以允许 JSON 解析和提取到 CoinPrices。例如,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

在上面,async 边界两侧的阶段将在不同的参与者中执行,因此可能同时执行(如果有足够的 CPU 核心可用等),代价是参与者协调和交换消息的额外开销。额外的 coordination/communication 开销(参见 Gunther 的通用可伸缩性法则)只有在 JSON 对象足够大并且进入速度足够快(始终在前一个对象完成处理之前进入)的情况下才值得).

如果您打算在程序停止之前使用 websocket,您可能会发现使用 Source.never[Message].

会更清楚