Spark Structured Streaming - 由于输入源数量增加,检查点出现断言错误

Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources

我正在尝试将两个流合并为一个并将结果写入主题

代码: 1-阅读两个主题

val PERSONINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xx:9092")
    .option("subscribe", "PERSONINFORMATION")
    .option("group.id", "info")
    .option("maxOffsetsPerTrigger", 1000)
    .option("startingOffsets", "earliest")
    .load()


val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "CANDIDATEINFORMATION")
    .option("group.id", "candent")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .option("failOnDataLoss", "false")
    .load()

2- 解析数据加入他们:

val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")

   val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")

   val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
   val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")

3- 连接两帧

  val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")

  val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))

4- 将它们写入主题

  string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", xxxx:9092")
      .option("topic", "toDelete")
      .option("checkpointLocation", "checkpoints")
      .option("failOnDataLoss", "false")
      .start()
      .awaitTermination()

错误信息:

    21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
       

你的代码在我看来没问题,而是导致问题的检查点。

根据您收到的错误消息,您可能 运行 此作业只有一个流源。然后,您添加了流连接的代码,并尝试在不删除现有检查点文件的情况下重新启动应用程序。现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个源,现在有两个源。

Recovery Semantics after Changes in a Streaming Query 部分解释了在使用检查点时哪些更改是允许的,哪些是不允许的。不允许更改输入源的数量:

"Changes in the number or type (i.e. different source) of input sources: This is not allowed."

解决您的问题:删除当前检查点文件并重新启动作业。