如何使用 pyspark 检查 file/folder 是否存在而不会出现异常
How to check a file/folder is present using pyspark without getting exception
我试图在从数据块中的 pyspark 读取文件之前检查文件是否存在以避免异常?我尝试了下面的代码片段,但是当文件不存在时出现异常
from pyspark.sql import *
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
try:
df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter",",").options(header='true', inferschema='true').load('/FileStore/tables/HealthCareSample_dumm.csv')
print("File Exists")
except IOError:
print("file not found")`
当我有文件时,它读取文件并 "prints File Exists" 但是当文件不存在时它会抛出 "AnalysisException: 'Path does not exist: dbfs:/FileStore/tables/HealthCareSample_dumm.csv;'"
看来您应该将 except IOError:
更改为 except AnalysisException:
。
Spark 在很多情况下抛出的 errors/exception 与常规 python 不同。它在读取文件时不会执行典型的 python io 操作,因此抛出不同的异常是有意义的。
很高兴在 Whosebug 上见到你。
我支持 dijksterhuis 的解决方案,但有一个例外 -
Analysis Exception是Spark中非常普遍的异常,可能由于多种原因导致,而不仅仅是由于文件丢失。
如果要检查文件是否存在,则需要绕过Spark的FS抽象,直接访问存储系统(无论是s3,posix,还是其他)。这个解决方案的缺点是缺乏抽象——一旦你改变了你的底层 FS,你也需要改变你的代码。
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
感谢@Dror 和@Kini。我运行 spark on cluster,必须加sc._jvm.java.net.URI.create("s3://" + path.split("/")[2])
,这里s3
是你集群文件系统的前缀
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
您可以验证文件是否存在,如下所示:
import os
if os.path.isfile('/path/file.csv'):
print("File Exists")
my_df = spark.read.load("/path/file.csv")
...
else:
print("File doesn't exists")
dbutils.fs.ls(file_location)
不要导入 dbutils。当您启动集群时它已经存在。
@rosefun 发布的答案对我有用,但我花了很多时间才让它起作用。因此,我将详细介绍该解决方案的工作原理以及您应该避免的情况。
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
功能相同,可以正常检查文件是否存在于您提供的 S3 存储桶路径中。
您将必须根据您为此函数指定路径值的方式更改此函数。
path = f"s3://bucket-name/import/data/"
pathexists = path_exists(path)
如果您定义的路径变量在路径中有 s3 前缀,那么它将起作用。
此外,拆分字符串的代码部分只为您提供存储桶名称,如下所示:
path.split("/")[2] will give you `bucket-name`
但是如果路径中没有 s3 前缀,那么您将不得不通过更改一些代码来使用该函数,如下所示:
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path("s3://" + path))
我试图在从数据块中的 pyspark 读取文件之前检查文件是否存在以避免异常?我尝试了下面的代码片段,但是当文件不存在时出现异常
from pyspark.sql import *
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
try:
df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter",",").options(header='true', inferschema='true').load('/FileStore/tables/HealthCareSample_dumm.csv')
print("File Exists")
except IOError:
print("file not found")`
当我有文件时,它读取文件并 "prints File Exists" 但是当文件不存在时它会抛出 "AnalysisException: 'Path does not exist: dbfs:/FileStore/tables/HealthCareSample_dumm.csv;'"
看来您应该将 except IOError:
更改为 except AnalysisException:
。
Spark 在很多情况下抛出的 errors/exception 与常规 python 不同。它在读取文件时不会执行典型的 python io 操作,因此抛出不同的异常是有意义的。
很高兴在 Whosebug 上见到你。
我支持 dijksterhuis 的解决方案,但有一个例外 - Analysis Exception是Spark中非常普遍的异常,可能由于多种原因导致,而不仅仅是由于文件丢失。
如果要检查文件是否存在,则需要绕过Spark的FS抽象,直接访问存储系统(无论是s3,posix,还是其他)。这个解决方案的缺点是缺乏抽象——一旦你改变了你的底层 FS,你也需要改变你的代码。
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
感谢@Dror 和@Kini。我运行 spark on cluster,必须加sc._jvm.java.net.URI.create("s3://" + path.split("/")[2])
,这里s3
是你集群文件系统的前缀
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
您可以验证文件是否存在,如下所示:
import os
if os.path.isfile('/path/file.csv'):
print("File Exists")
my_df = spark.read.load("/path/file.csv")
...
else:
print("File doesn't exists")
dbutils.fs.ls(file_location)
不要导入 dbutils。当您启动集群时它已经存在。
@rosefun 发布的答案对我有用,但我花了很多时间才让它起作用。因此,我将详细介绍该解决方案的工作原理以及您应该避免的情况。
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
功能相同,可以正常检查文件是否存在于您提供的 S3 存储桶路径中。
您将必须根据您为此函数指定路径值的方式更改此函数。
path = f"s3://bucket-name/import/data/"
pathexists = path_exists(path)
如果您定义的路径变量在路径中有 s3 前缀,那么它将起作用。
此外,拆分字符串的代码部分只为您提供存储桶名称,如下所示:
path.split("/")[2] will give you `bucket-name`
但是如果路径中没有 s3 前缀,那么您将不得不通过更改一些代码来使用该函数,如下所示:
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path("s3://" + path))