Kafka - scala - 处理多条消息

Kafka - scala - processing multiple messages

是否可以通过 Kafka Producer 对象发送一个字符串数组。我想从 'topic1' - 文本行中获取一些消息,然后将其拆分为单个单词并将其发送到另一个主题。

    object KafkaConsumer extends App {

      implicit val actorSystem = ActorSystem("test-actor-system")
      implicit val streamMaterializer = ActorMaterializer()
      implicit val executionContext = actorSystem.dispatcher
      val log = actorSystem.log


      // PRODUCER config
      val producerSettings = ProducerSettings(
        actorSystem,
        new ByteArraySerializer,
        new StringSerializer)
        .withBootstrapServers("localhost:9092")
        .withProperty("auto.create.topics.enable", "true")

      // CONSUMER config
      val consumerSettings = ConsumerSettings(
        system = actorSystem,
        keyDeserializer = new ByteArrayDeserializer,
        valueDeserializer = new StringDeserializer)
        .withBootstrapServers("localhost:9092")
        .withGroupId("kafka-sample")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      // -----------------------------------------------------------------------//

      // ROUTE OF THE APP
      Consumer.committableSource(consumerSettings, 
      Subscriptions.topics("topic1"))
     .map { 
           msg => println(s"topic1 -> topic2: $msg") 
           ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
          }
     .runWith(Producer.commitableSink(producerSettings))
     }  

你应该在 map 之前使用 mapConcat,因为它

Transform each input element into an Iterable of output elements that is then flattened into the output stream.

完整的附加行如下:

Subscriptions.topics("topic1"))
  .mapConcat { msg => msg.record.value().split(" ").toList }
  .map { ...

A​​kka Streams 示例创建了一个简单的流,它读取一条消息,使用向 Kafka 生成的 Sink 并提交所消费消息的偏移量。如果您需要阅读一条或多条消息并生成许多消息,因为消费集中存在单词,您应该更多地使用 Akka Stream Graph api。

此示例使用 Graphs 并从 Kafka 构建一个 Source,并使用 groupedWithin 读取一堆消息并获取现有单词。

创建了两个简单的流,一个用于获取最后一个偏移量,另一个用于获取单词。然后创建一个 Source 阶段,将消费的消息从 Kafka 广播到两个流并将结果压缩到一个元组 (Seq[String],Long) 中。使用 runForeach 函数生成消息。请注意,消息不是按 Future.sequence.

的顺序生成的

尽管示例看起来可能很长,但它可以使用 "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"

编译并正常工作
import java.util.Properties

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}

import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._

object SplitSource extends App {

  implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher
  val log = actorSystem.log

  // PRODUCER config
  val producerSettings = ProducerSettings(actorSystem,
                                          new ByteArraySerializer,
                                          new StringSerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("auto.create.topics.enable", "true")

  // CONSUMER config
  val consumerSettings =
    ConsumerSettings(system = actorSystem,
                     keyDeserializer = new ByteArrayDeserializer,
                     valueDeserializer = new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("kafka-sample4")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  implicit val producerConfig = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("key.serializer", classOf[StringSerializer].getName)
    props.setProperty("value.serializer", classOf[StringSerializer].getName)
    props
  }

  lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)

  // Create Scala future from Java
  private def publishToKafka(id: String, data: String) = {
    Future {
      kafkaProducer
        .send(new ProducerRecord("outTopic", id, data))
        .get()
    }
  }

  def getKafkaSource =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("inTopic"))
      // It consumes 10 messages or waits 30 seconds to push downstream
      .groupedWithin(10, 30 seconds)

  val getStreamSource = GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val in = getKafkaSource

    // BroadCast to two flows. One for obtain the last offset to commit
    // and other to return the Seq with the words to publish
    val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
    val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
    val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)

    // Flow that creates the list of all words in all consumed messages
    val _flowWords =
      Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
        input.map(_.record.value()).mkString(" ").split(" ")
      })

    val zip = Zip[CommittableOffset, Array[String]]

    // build the Stage
    in ~> br ~> flowCommit ~> zipResult.in0
          br ~> _flowWords ~> zipResult.in1

    SourceShape(zipResult.out)
  }

  Source.fromGraph(getStreamSource).runForeach { msgs =>
    {
      // Publish all words and when all futures complete the commit the last Kafka offset
      val futures = msgs._2.map(publishToKafka("outTopic", _)).toList

      // Produces in parallel!!. Use flatMap to make it in order
      Future.sequence(futures).onComplete {
        case Success(e) => {
          // Once all futures are done, it makes commit to the last consumed message
          msgs._1.commitScaladsl()
        }
      }
    }
  }

}

A​​kka Stream api 允许创建很棒的处理管道。