如何让 Spark 在重启时使用之前的偏移量 ID?

How do I make Spark consume from the previous offset id on restarting?

我正在使用 Spark 使用来自 Kafka 的数据。在消耗一些数据后重新启动 Spark,我如何确保 Spark 将从它停止的偏移量开始消耗?

例如,如果第一个 运行,Spark 消耗了直到偏移 ID x。我如何确保在下一个 运行 它将以偏移 ID x+1 开始?

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());

在基于接收器的方法中(您通过 KafkaUtils.createStream 使用),偏移量由 WAL (Write Ahead Log), which is responsible for resuming your application from the proper offsets. If you want to be able to control exactly where you resume your application from, look into KafkaUtils.createDStream and generally the Direct (receiverless) API streaming approach 保存和处理,重载采用 fromOffsets: Map[TopicAndPartition, Long].