如何使用 Akka-Stream 在 Reactive Kafka 中 'Chunk and Re-assmble' 大消息
How to 'Chunk and Re-assmble' large messages in Reactive Kafka using Akka-Stream
当使用 Kafka 发送大文件时,是否可以跨分区分发它然后使用 Akka-Stream 重新assemble它?如本演示文稿所述:
http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
"chunking" 方面,即生产者,很容易使用 reactive kafka:
之类的东西来编写
case class LargeMessage(bytes : Seq[Byte], topic : String)
def messageToKafka(message : LargeMessage, maxMessageSize : Int) =
Source.fromIterator(() => message.bytes.toIterator)
.via(Flow[Byte].grouped(maxMessageSize))
.via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
.runWith(Producer.plainSink(producerSettings)
"re-assembling",即消费者,可以用类似于the documentation:
的方式实现
val messageFut : Future[LargeMessage] =
for {
bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
} yield LargeMessage(bytes, topic)
当使用 Kafka 发送大文件时,是否可以跨分区分发它然后使用 Akka-Stream 重新assemble它?如本演示文稿所述:
http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
"chunking" 方面,即生产者,很容易使用 reactive kafka:
之类的东西来编写case class LargeMessage(bytes : Seq[Byte], topic : String)
def messageToKafka(message : LargeMessage, maxMessageSize : Int) =
Source.fromIterator(() => message.bytes.toIterator)
.via(Flow[Byte].grouped(maxMessageSize))
.via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
.runWith(Producer.plainSink(producerSettings)
"re-assembling",即消费者,可以用类似于the documentation:
的方式实现 val messageFut : Future[LargeMessage] =
for {
bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
} yield LargeMessage(bytes, topic)