Akka Stream 持续消费 websocket
Akka Stream continuously consume websocket
我对 Scala 和 Akka Stream 有点陌生,我正在尝试从 websocket 获取 JSON 字符串消息并将它们推送到 Kafka 主题。
现在我只处理“从 ws 获取消息”部分。
来自 websocket 的消息如下所示:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
我想将此 JSON 消息拆分为多条消息:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
然后将这些消息中的每一个推送到 kafka 主题。
这是我到目前为止取得的成就:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
它正在工作,我得到了预期的输出 Json 消息,但我想知道我是否可以用更“Akka-ish”的风格编写这个制作人,比如使用 GraphDSL。所以我有几个问题:
- 是否可以使用 GraphDSL 连续使用 WebSocket?如果是的话,你能给我举个例子吗?
- 使用 GraphDSL 使用 WS 是个好主意吗?
- 在将收到的 Json 消息发送到 kafka 之前,我是否应该像我所做的那样对其进行分解?或者最好按原样发送以降低延迟?
- 在向 Kafka 生成消息后,我打算使用 Apache Storm 使用它,这是个好主意吗?还是我应该坚持使用 Akka?
感谢阅读我,问候,
阿雷斯
该代码很像 Akka:scaladsl
与 GraphDSL
或实现自定义 GraphStage
一样是 Akka。 IMO/E 转到 GraphDSL
的唯一原因是如果图形的实际形状在 scaladsl
.
中不容易表达
我个人会采用定义 CoinPrice
class 的方式来使模型显式化
case class CoinPrice(coin: String, price: BigDecimal)
然后有一个 Flow[Message, CoinPrice, NotUsed]
将 1 条传入消息解析为零个或多个 CoinPrice
。像这样的东西(在这里使用 Play JSON):
val toCoinPrices =
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString)
.asOpt[JsObject]
.toList
.flatMap { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
}
根据消息中 JSON 的大小,您可能希望将其分成不同的流阶段,以允许 JSON 解析和提取到 CoinPrice
s。例如,
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString).asOpt[JsObject].toList
}
.async
.mapConcat { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
在上面,async
边界两侧的阶段将在不同的参与者中执行,因此可能同时执行(如果有足够的 CPU 核心可用等),代价是参与者协调和交换消息的额外开销。额外的 coordination/communication 开销(参见 Gunther 的通用可伸缩性法则)只有在 JSON 对象足够大并且进入速度足够快(始终在前一个对象完成处理之前进入)的情况下才值得).
如果您打算在程序停止之前使用 websocket,您可能会发现使用 Source.never[Message]
.
会更清楚
我对 Scala 和 Akka Stream 有点陌生,我正在尝试从 websocket 获取 JSON 字符串消息并将它们推送到 Kafka 主题。
现在我只处理“从 ws 获取消息”部分。
来自 websocket 的消息如下所示:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
我想将此 JSON 消息拆分为多条消息:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
然后将这些消息中的每一个推送到 kafka 主题。
这是我到目前为止取得的成就:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
它正在工作,我得到了预期的输出 Json 消息,但我想知道我是否可以用更“Akka-ish”的风格编写这个制作人,比如使用 GraphDSL。所以我有几个问题:
- 是否可以使用 GraphDSL 连续使用 WebSocket?如果是的话,你能给我举个例子吗?
- 使用 GraphDSL 使用 WS 是个好主意吗?
- 在将收到的 Json 消息发送到 kafka 之前,我是否应该像我所做的那样对其进行分解?或者最好按原样发送以降低延迟?
- 在向 Kafka 生成消息后,我打算使用 Apache Storm 使用它,这是个好主意吗?还是我应该坚持使用 Akka?
感谢阅读我,问候, 阿雷斯
该代码很像 Akka:scaladsl
与 GraphDSL
或实现自定义 GraphStage
一样是 Akka。 IMO/E 转到 GraphDSL
的唯一原因是如果图形的实际形状在 scaladsl
.
我个人会采用定义 CoinPrice
class 的方式来使模型显式化
case class CoinPrice(coin: String, price: BigDecimal)
然后有一个 Flow[Message, CoinPrice, NotUsed]
将 1 条传入消息解析为零个或多个 CoinPrice
。像这样的东西(在这里使用 Play JSON):
val toCoinPrices =
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString)
.asOpt[JsObject]
.toList
.flatMap { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
}
根据消息中 JSON 的大小,您可能希望将其分成不同的流阶段,以允许 JSON 解析和提取到 CoinPrice
s。例如,
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString).asOpt[JsObject].toList
}
.async
.mapConcat { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
在上面,async
边界两侧的阶段将在不同的参与者中执行,因此可能同时执行(如果有足够的 CPU 核心可用等),代价是参与者协调和交换消息的额外开销。额外的 coordination/communication 开销(参见 Gunther 的通用可伸缩性法则)只有在 JSON 对象足够大并且进入速度足够快(始终在前一个对象完成处理之前进入)的情况下才值得).
如果您打算在程序停止之前使用 websocket,您可能会发现使用 Source.never[Message]
.