Spark Streaming kafka 偏移量管理

Spark Streaming kafka offset manage

我一直在做 spark streaming 工作,通过 kafka 消费和生产数据。我用的是directDstream,所以我必须自己管理offset,我们采用redis来写和读 offsets.Now 有一个问题,当我启动我的客户端时,我的客户端需要从redis中获取偏移量,而不是存在的偏移量在kafka itself.how show I write my code?现在我已经在下面写了我的代码:

   kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},
    fromOffsets=read_offset_range(config.OFFSET_KEY))

但我认为 fromOffsets 是 spark-streaming 客户端启动时的值(来自 redis),而不是在它 running.thank 你求助期间。

如果我对您的理解正确,您需要手动设置偏移量。我是这样做的:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition

stream = StreamingContext(sc, 120) # 120 second window

kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"

topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}

kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)