如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点继续从重启后停止的 Kafka 读取?

How to use kafka.group.id and checkpoints in spark 3.0 structured streaming to continue to read from Kafka where it left off after restart?

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?

不过,我尝试了spark 3.0中的设置,如下所示。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement


//import org.apache.spark.sql.hive.HiveContext

import scala.io.Source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding


object App {
    
    def main(args: Array[String]): Unit = {
      
      val spark: SparkSession = SparkSession.builder()
        .appName("MY-APP")
        .getOrCreate()

      import spark.sqlContext.implicits._

      spark.catalog.clearCache()
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

      spark.sparkContext.setLogLevel("ERROR")
      spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
      
      System.gc()
      
      val df = spark.readStream
        .format("kafka")
          .option("kafka.bootstrap.servers", "mybroker.io:6667")
          .option("subscribe", "mytopic")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
          .option("kafka.group.id","MYID")
          .load()

      df.printSchema()

      
      val schema = new StructType()
        .add("id", StringType)
        .add("x", StringType)
        .add("eventtime", StringType)

      val idservice = df.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), schema).as("data"))
        .select("data.*")

       
      val monitoring_df = idservice
                .selectExpr("cast(id as string) id", 
                            "cast(x as string) x",
                            "cast(eventtime as string) eventtime")              

      val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                    batchDF.persist()
                                    printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)                                    
                                    batchDF.show()

                                    batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
                                    spark.catalog.refreshTable("mytable")
                                    
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
                                }
                            }
                            .start()
                            .awaitTermination()
    }
   
}

Spark 作业是使用下面的 spark-submit 命令在独立模式下测试的,但是当我在 AWS EMR 中以集群模式部署时存在同样的问题。

spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar

然后,我启动了流式作业以从 Kafka 主题读取流式数据。一段时间后,我杀死了火花工作。然后,我等待 1 小时再次开始工作。如果我没理解错的话,新的流数据应该是从我杀死spark job时的offset开始的。但是,它仍然以最新的偏移量开始,这导致我停止作业期间数据丢失。

我是否需要配置更多选项以避免数据丢失?还是我对Spark 3.0有什么误解?谢谢!

问题已解决

这里的关键问题是检查点必须专门添加到查询中。仅仅为 SparkContext 添加检查点是不够的。添加检查点后,它正在工作。在checkpoint文件夹中,它会创建一个offset子文件夹,里面有offset文件,0,1,2,3...。对于每个文件,它会显示不同分区的offset信息。

{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}

一个建议是将检查点放在一些外部存储中,例如s3。它可以帮助恢复偏移量,即使您需要重建 EMR 集群本身以防万一。

根据 Spark Structured Integration Guide,Spark 本身正在跟踪偏移量并且没有偏移量提交回 Kafka。这意味着如果您的 Spark Streaming 作业失败并且您重新启动它,所有关于偏移量的必要信息都存储在 Spark 的检查点文件中。

即使您将 ConsumerGroup 名称设置为 kafka.group.id,您的应用程序仍然不会将消息提交回 Kafka。有关要读取的下一个偏移量的信息仅在您的 Spark 应用程序的检查点文件中可用。

如果您在没有 re-deployment 的情况下停止并重新启动您的应用程序,并确保您没有删除旧的检查点文件,您的应用程序将从它停止的地方继续读取。

Recovering from Failures with Checkpointing 上的 Spark Structured Streaming 文档中写道:

"In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) [...]"

这可以通过在您的 writeStream 查询中设置以下选项来实现(在您的 SparkContext 配置中 不足以 设置检查点目录):

.option("checkpointLocation", "path/to/HDFS/dir")

在文档中还指出“此检查点位置必须是 HDFS 兼容文件系统 中的路径,并且可以在 DataStreamWriter 中设置为选项开始查询。"

此外,Spark Structured Streaming 的容错能力还取决于 Output Sinks 部分所述的输出接收器。

由于您目前正在使用 ForeachBatch 接收器,您的应用程序可能没有重启功能。