Spark Structured Streaming 使用 spark-acid writeStream(带检查点)抛出 org.apache.hadoop.fs.FileAlreadyExistsException
Spark Structured Streaming using spark-acid writeStream (with checkpoint) throwing org.apache.hadoop.fs.FileAlreadyExistsException
在我们的 Spark 应用程序中,我们使用 Spark structured streaming
。它使用 Kafka as input stream
,& HiveAcid as writeStream
到 Hive table。
对于 HiveAcid
,它是来自 qubole
的名为 spark acid
的开源库:https://github.com/qubole/spark-acid
下面是我们的代码:
import za.co.absa.abris.avro.functions.from_confluent_avro
....
val spark = SparkSession
.builder()
.appName("events")
.config("spark.sql.streaming.metricsEnabled", true)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val input_stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
.option("maxOffsetsPerTrigger", 10000)
.option("subscribe", "events")
.load()
// schema registry config
val srConfig = Map(
"schema.registry.url" -> "http://schema-registry:8081",
"value.schema.naming.strategy" -> "topic.name",
"schema.registry.topic" -> "events",
"value.schema.id" -> "latest"
)
val data = input_stream_df
.withColumn("value", from_confluent_avro(col("value"), srConfig))
.withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
.select(
$"value.*",
year($"timestamp_s") as 'year,
month($"timestamp_s") as 'month,
dayofmonth($"timestamp_s") as 'day
)
// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
.queryName("hiveSink")
.option("database", "default")
.option("table", "events_sink")
.option("checkpointLocation", "/user/spark/events/checkpoint")
.option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
.option("metastoreUri", "thrift://hive-metastore:9083")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
output_stream_df.awaitTermination()
我们能够将应用程序部署到生产环境,并重新部署它几次(~ 10 次),没有问题。然后它运行变成如下错误:
Query hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId =
990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] terminated with exception: Job
aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most
recent failure: Lost task 3.3 in stage 0.0 (TID 42, 10.236.7.228,
executor 3): org.apache.hadoop.fs.FileAlreadyExistsException:
/warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_0020079_0020079/bucket_00003
for client 10.236.7.228 already exists (...) at
com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95)
at com.qubole.shaded.orc.impl.WriterImpl.(WriterImpl.java:177)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432)
at
com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484)
at
com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295)
at
com.qubole.spark.hiveacid.writer.TableWriter$$anon$$anonfun.apply(TableWriter.scala:153)
at
com.qubole.spark.hiveacid.writer.TableWriter$$anon$$anonfun.apply(TableWriter.scala:153)
(...) at
com.qubole.spark.hiveacid.writer.TableWriter$$anon.apply(TableWriter.scala:153)
at
com.qubole.spark.hiveacid.writer.TableWriter$$anon.apply(TableWriter.scala:139)
每次重新启动应用程序时,都会显示不同的 delta + bucket files
已存在错误。然而,这些文件是每次启动时(很可能)新创建的,但不知道为什么会抛出错误。
任何指点将不胜感激。
我从工作人员的错误日志中发现了真正的根本原因。这是由于我在其中一个使用的库中所做的代码更改导致 out of memory
问题。
我之前发布的是驱动程序的错误日志,在工作节点上发生了几次故障后。
在我们的 Spark 应用程序中,我们使用 Spark structured streaming
。它使用 Kafka as input stream
,& HiveAcid as writeStream
到 Hive table。
对于 HiveAcid
,它是来自 qubole
的名为 spark acid
的开源库:https://github.com/qubole/spark-acid
下面是我们的代码:
import za.co.absa.abris.avro.functions.from_confluent_avro
....
val spark = SparkSession
.builder()
.appName("events")
.config("spark.sql.streaming.metricsEnabled", true)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val input_stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
.option("maxOffsetsPerTrigger", 10000)
.option("subscribe", "events")
.load()
// schema registry config
val srConfig = Map(
"schema.registry.url" -> "http://schema-registry:8081",
"value.schema.naming.strategy" -> "topic.name",
"schema.registry.topic" -> "events",
"value.schema.id" -> "latest"
)
val data = input_stream_df
.withColumn("value", from_confluent_avro(col("value"), srConfig))
.withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
.select(
$"value.*",
year($"timestamp_s") as 'year,
month($"timestamp_s") as 'month,
dayofmonth($"timestamp_s") as 'day
)
// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
.queryName("hiveSink")
.option("database", "default")
.option("table", "events_sink")
.option("checkpointLocation", "/user/spark/events/checkpoint")
.option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
.option("metastoreUri", "thrift://hive-metastore:9083")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
output_stream_df.awaitTermination()
我们能够将应用程序部署到生产环境,并重新部署它几次(~ 10 次),没有问题。然后它运行变成如下错误:
Query hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] terminated with exception: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 42, 10.236.7.228, executor 3): org.apache.hadoop.fs.FileAlreadyExistsException: /warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_0020079_0020079/bucket_00003 for client 10.236.7.228 already exists (...) at com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95) at com.qubole.shaded.orc.impl.WriterImpl.(WriterImpl.java:177) at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484) at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$$anonfun.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$$anonfun.apply(TableWriter.scala:153) (...) at com.qubole.spark.hiveacid.writer.TableWriter$$anon.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon.apply(TableWriter.scala:139)
每次重新启动应用程序时,都会显示不同的 delta + bucket files
已存在错误。然而,这些文件是每次启动时(很可能)新创建的,但不知道为什么会抛出错误。
任何指点将不胜感激。
我从工作人员的错误日志中发现了真正的根本原因。这是由于我在其中一个使用的库中所做的代码更改导致 out of memory
问题。
我之前发布的是驱动程序的错误日志,在工作节点上发生了几次故障后。