如何使用 Spark 的 Direct Stream for Kafka 设置消费者组提交的偏移量?
How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?
我正在尝试使用 Spark's Direct Approach (No Receivers) for Kafka,我有以下 Kafka 配置图:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
现在我的 objective 是,如果我的 Spark 管道崩溃并再次启动,流应该从消费者组提交的最新偏移量开始。因此,为此,我想为消费者指定起始偏移量。我有关于每个分区中提交的偏移量的信息。我如何将此信息提供给流功能。目前我正在使用
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);
查看 Spark API docs 中 createDirectStream 的第二种形式 - 它允许您传入 Map<TopicAndPartition, Long>
,其中 Long 是偏移量。
请注意,当使用 DirectInputStream 时,Spark 不会自动更新 Zookeeper 中的偏移量 - 您必须自己将它们写入 ZK 或其他数据库。除非你对 exactly-once 语义有严格的要求,否则使用 createStream 方法取回一个 DStream 会更容易,在这种情况下,Spark 将更新 ZK 中的偏移量,并在失败的情况下从上次存储的偏移量恢复。
根据您的要求,正确的解决方案是使用检查点。对于每个处理过的 RDDStream,检查点会将元数据写入指定的共享存储(通常是 hdfs)。它的元数据,而不是真实数据,因此没有真正的性能影响。
如果 spark 进程崩溃并重新启动,它将首先读取检查点并从保存的距检查点的偏移量恢复。
您可以参考示例代码,其中我使用 spark streaming 使用检查点将数据可靠地写入 elasticsearch。
https://github.com/atulsm/Test_Projects/blob/master/src/spark/StreamingKafkaRecoverableDirectEvent.java
我正在尝试使用 Spark's Direct Approach (No Receivers) for Kafka,我有以下 Kafka 配置图:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
现在我的 objective 是,如果我的 Spark 管道崩溃并再次启动,流应该从消费者组提交的最新偏移量开始。因此,为此,我想为消费者指定起始偏移量。我有关于每个分区中提交的偏移量的信息。我如何将此信息提供给流功能。目前我正在使用
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);
查看 Spark API docs 中 createDirectStream 的第二种形式 - 它允许您传入 Map<TopicAndPartition, Long>
,其中 Long 是偏移量。
请注意,当使用 DirectInputStream 时,Spark 不会自动更新 Zookeeper 中的偏移量 - 您必须自己将它们写入 ZK 或其他数据库。除非你对 exactly-once 语义有严格的要求,否则使用 createStream 方法取回一个 DStream 会更容易,在这种情况下,Spark 将更新 ZK 中的偏移量,并在失败的情况下从上次存储的偏移量恢复。
根据您的要求,正确的解决方案是使用检查点。对于每个处理过的 RDDStream,检查点会将元数据写入指定的共享存储(通常是 hdfs)。它的元数据,而不是真实数据,因此没有真正的性能影响。
如果 spark 进程崩溃并重新启动,它将首先读取检查点并从保存的距检查点的偏移量恢复。
您可以参考示例代码,其中我使用 spark streaming 使用检查点将数据可靠地写入 elasticsearch。 https://github.com/atulsm/Test_Projects/blob/master/src/spark/StreamingKafkaRecoverableDirectEvent.java