Spark Structured Streaming - 由于输入源数量增加,检查点出现断言错误
Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources
我正在尝试将两个流合并为一个并将结果写入主题
代码:
1-阅读两个主题
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
2- 解析数据加入他们:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
3- 连接两帧
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))
4- 将它们写入主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers", xxxx:9092")
.option("topic", "toDelete")
.option("checkpointLocation", "checkpoints")
.option("failOnDataLoss", "false")
.start()
.awaitTermination()
错误信息:
21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
你的代码在我看来没问题,而是导致问题的检查点。
根据您收到的错误消息,您可能 运行 此作业只有一个流源。然后,您添加了流连接的代码,并尝试在不删除现有检查点文件的情况下重新启动应用程序。现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个源,现在有两个源。
Recovery Semantics after Changes in a Streaming Query 部分解释了在使用检查点时哪些更改是允许的,哪些是不允许的。不允许更改输入源的数量:
"Changes in the number or type (i.e. different source) of input sources: This is not allowed."
解决您的问题:删除当前检查点文件并重新启动作业。
我正在尝试将两个流合并为一个并将结果写入主题
代码: 1-阅读两个主题
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
2- 解析数据加入他们:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
3- 连接两帧
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))
4- 将它们写入主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers", xxxx:9092")
.option("topic", "toDelete")
.option("checkpointLocation", "checkpoints")
.option("failOnDataLoss", "false")
.start()
.awaitTermination()
错误信息:
21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
你的代码在我看来没问题,而是导致问题的检查点。
根据您收到的错误消息,您可能 运行 此作业只有一个流源。然后,您添加了流连接的代码,并尝试在不删除现有检查点文件的情况下重新启动应用程序。现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个源,现在有两个源。
Recovery Semantics after Changes in a Streaming Query 部分解释了在使用检查点时哪些更改是允许的,哪些是不允许的。不允许更改输入源的数量:
"Changes in the number or type (i.e. different source) of input sources: This is not allowed."
解决您的问题:删除当前检查点文件并重新启动作业。