从 Spark 串行消费 Kafka 主题

Serial consumption of Kafka topics from Spark

给定以下代码:

def createKafkaStream(ssc: StreamingContext, 
                      kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.foreach { msg =>
            // Now do some DataFrame-intensive work.
            // As I understand things, DataFrame ops must be run
            // on Workers as well as streaming consumers.
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

我的理解 是 Spark 和 Kafka 将自动协同工作以确定有多少消费者线程部署到可用的工作节点,这可能会导致并行处理消息Kafka 主题。

但是如果我想要多个并行消费者怎么办?如果想要 1-and-only-1 消费者从主题读取下一条消息,完全处理它,然后重新开始并轮询下一条消息怎么办。

此外,当我打电话时:

val ssc = new StreamingContext(sc, Seconds(10))

这是否意味着:

But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.

如果那是您的用例,我会说为什么要使用 Spark?它的全部优点是您可以并行阅读。我能想到的唯一 hacky 解决方法是创建一个具有单个分区的 Kafka 主题,这将使 Spark 将整个偏移量范围分配给单个 worker,但这很丑陋。

Does that mean that a single consumer thread will receive all messages that were published to the topic in the last 10 seconds or that a single consumer thread will receive the next (single) message from the topic, and that it will poll for the next message every 10 seconds?

都没有。由于您使用的是直接(无接收器)流方法,这意味着每隔 10 秒,您的驱动程序将要求 Kafka 为上述主题的每个分区提供自上一批以来已更改的偏移范围。然后,Spark 将获取每个这样的偏移范围,并将其发送给其中一个 worker 以直接从 Kafka 消费。这意味着使用直接流方法,Kafka 分区和 Spark 分区之间存在 1:1 对应关系。