Akka Stream 是一个自定义图形阶段
akka stream custom graph stage
我有一个来自网络套接字的 akka 流,例如 ,我想构建一个可重用的图形阶段(inlet
:流,FlowShape
:添加一个额外的字段到 JSON 指定来源,即
{
...,
"origin":"blockchain.info"
}
和 outlet
到卡夫卡。
我面临以下3个问题:
- 无法全神贯注地从 Web 套接字流创建自定义
Inlet
- 无法将kafka直接集成到stream中(见下面代码)
- 不确定添加附加字段的转换器是否需要反序列化 json 以添加 origin
示例项目(仅限流程)如下所示:
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val incoming: Sink[Message, Future[Done]] =
Flow[Message].mapAsync(4) {
case message: TextMessage.Strict =>
println(message.text)
Future.successful(Done)
case message: TextMessage.Streamed =>
message.textStream.runForeach(println)
case message: BinaryMessage =>
message.dataStream.runWith(Sink.ignore)
}.toMat(Sink.last)(Keep.right)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
// TODO not working integrating kafka here
// .map(_.toString)
// .map { elem =>
// println(s"PlainSinkProducer produce: ${elem}")
// new ProducerRecord[Array[Byte], String]("topic1", elem)
// }
// .runWith(Producer.plainSink(producerSettings))
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
system.terminate
}
}
// kafka that works / writes dummy data
val done1 = Source(1 to 100)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
一个问题是围绕 incoming
阶段,它被建模为 Sink
。它应该被建模为 Flow
。随后将消息馈送到 Kafka。
因为传入的短信可以是 Streamed
。您可以按如下方式使用 flatMapMerge
组合器来避免将整个(可能很大的)消息存储在内存中的需要:
val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
case msg: BinaryMessage =>
msg.dataStream.runWith(Sink.ignore)
Future.successful(None)
case TextMessage.Streamed(src) =>
src.runFold("")(_ + _).map { msg => Some(msg) }
}.collect {
case Some(msg) => msg
}
此时你得到了一些可以产生字符串的东西,并且可以连接到 Kafka:
val addOrigin: Flow[String, String, NotUsed] = ???
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.via(incoming)
.via(addOrigin)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.toMat(Producer.plainSink(producerSettings))(Keep.both)
.run()
我有一个来自网络套接字的 akka 流,例如 inlet
:流,FlowShape
:添加一个额外的字段到 JSON 指定来源,即
{
...,
"origin":"blockchain.info"
}
和 outlet
到卡夫卡。
我面临以下3个问题:
- 无法全神贯注地从 Web 套接字流创建自定义
Inlet
- 无法将kafka直接集成到stream中(见下面代码)
- 不确定添加附加字段的转换器是否需要反序列化 json 以添加 origin
示例项目(仅限流程)如下所示:
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val incoming: Sink[Message, Future[Done]] =
Flow[Message].mapAsync(4) {
case message: TextMessage.Strict =>
println(message.text)
Future.successful(Done)
case message: TextMessage.Streamed =>
message.textStream.runForeach(println)
case message: BinaryMessage =>
message.dataStream.runWith(Sink.ignore)
}.toMat(Sink.last)(Keep.right)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
// TODO not working integrating kafka here
// .map(_.toString)
// .map { elem =>
// println(s"PlainSinkProducer produce: ${elem}")
// new ProducerRecord[Array[Byte], String]("topic1", elem)
// }
// .runWith(Producer.plainSink(producerSettings))
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
system.terminate
}
}
// kafka that works / writes dummy data
val done1 = Source(1 to 100)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
一个问题是围绕 incoming
阶段,它被建模为 Sink
。它应该被建模为 Flow
。随后将消息馈送到 Kafka。
因为传入的短信可以是 Streamed
。您可以按如下方式使用 flatMapMerge
组合器来避免将整个(可能很大的)消息存储在内存中的需要:
val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
case msg: BinaryMessage =>
msg.dataStream.runWith(Sink.ignore)
Future.successful(None)
case TextMessage.Streamed(src) =>
src.runFold("")(_ + _).map { msg => Some(msg) }
}.collect {
case Some(msg) => msg
}
此时你得到了一些可以产生字符串的东西,并且可以连接到 Kafka:
val addOrigin: Flow[String, String, NotUsed] = ???
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.via(incoming)
.via(addOrigin)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.toMat(Producer.plainSink(producerSettings))(Keep.both)
.run()