用于防止 FileNotFoundExceptions 的 Spark 代码?
Spark code to protect from FileNotFoundExceptions?
Is there a way to run my spark program and be shielded from files
underneath changing?
代码从读取 parquet 文件开始(读取期间没有错误):
val mappings = spark.read.parquet(S3_BUCKET_PATH + "/table/mappings/")
然后它对数据进行转换,例如
val newTable = mappings.join(anotherTable, 'id)
这些转换需要几个小时(这是另一个问题)。
有时作业会完成,有时会终止并显示以下类似消息:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 6 in stage 1014.0 failed 4 times, most recent failure: Lost task
6.3 in stage 1014.0 (TID 106820, 10.127.251.252, executor 5): java.io.FileNotFoundException: No such file or directory:
s3a://bucket1/table/mappings/part-00007-21eac9c5-yyzz-4295-a6ef-5f3bb13bed64.snappy.parquet
我们认为另一项工作正在更改我们下面的文件,但未能找到罪魁祸首。
这里要解决的问题非常复杂。如果在同一数据帧上操作时基础数据发生变化,则 spark 作业将失败。原因是当创建数据框时,底层 RDD 知道数据的位置以及与之关联的 DAG。现在,如果底层数据突然被某个作业改变了,RDD 别无选择,只能让它失败。
一种启用重试、推测等的可能性,但问题仍然存在。通常,如果您在 parquet 中有一个 table 并且您想同时读写,请按日期或时间对 table 进行分区,然后写入将发生在不同的分区中,而读取将发生在不同的分区中.
现在加入需要很长时间的问题。如果您从 s3 读取数据然后加入并再次写回 s3,性能会变慢。因为现在 hadoop 需要先从 s3 获取数据然后执行操作(代码不去数据)。尽管网络调用速度很快,但我 运行 对 s3 与 EMR FS 进行了一些实验,发现 s3 速度降低了 50%。
一种替代方法是将数据从 s3 复制到 HDFS,然后 运行 加入。这将使您免受数据覆盖并且性能会更快。
如果您使用的是 spark 2.2 s3,最后一件事是由于 DirectOutputCommiter 的弃用而非常慢。所以这可能是放缓的另一个原因
Is there a way to run my spark program and be shielded from files underneath changing?
代码从读取 parquet 文件开始(读取期间没有错误):
val mappings = spark.read.parquet(S3_BUCKET_PATH + "/table/mappings/")
然后它对数据进行转换,例如
val newTable = mappings.join(anotherTable, 'id)
这些转换需要几个小时(这是另一个问题)。
有时作业会完成,有时会终止并显示以下类似消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 1014.0 failed 4 times, most recent failure: Lost task 6.3 in stage 1014.0 (TID 106820, 10.127.251.252, executor 5): java.io.FileNotFoundException: No such file or directory: s3a://bucket1/table/mappings/part-00007-21eac9c5-yyzz-4295-a6ef-5f3bb13bed64.snappy.parquet
我们认为另一项工作正在更改我们下面的文件,但未能找到罪魁祸首。
这里要解决的问题非常复杂。如果在同一数据帧上操作时基础数据发生变化,则 spark 作业将失败。原因是当创建数据框时,底层 RDD 知道数据的位置以及与之关联的 DAG。现在,如果底层数据突然被某个作业改变了,RDD 别无选择,只能让它失败。
一种启用重试、推测等的可能性,但问题仍然存在。通常,如果您在 parquet 中有一个 table 并且您想同时读写,请按日期或时间对 table 进行分区,然后写入将发生在不同的分区中,而读取将发生在不同的分区中.
现在加入需要很长时间的问题。如果您从 s3 读取数据然后加入并再次写回 s3,性能会变慢。因为现在 hadoop 需要先从 s3 获取数据然后执行操作(代码不去数据)。尽管网络调用速度很快,但我 运行 对 s3 与 EMR FS 进行了一些实验,发现 s3 速度降低了 50%。
一种替代方法是将数据从 s3 复制到 HDFS,然后 运行 加入。这将使您免受数据覆盖并且性能会更快。
如果您使用的是 spark 2.2 s3,最后一件事是由于 DirectOutputCommiter 的弃用而非常慢。所以这可能是放缓的另一个原因