如何使用 pyspark 检查 file/folder 是否存在而不会出现异常

How to check a file/folder is present using pyspark without getting exception

我试图在从数据块中的 pyspark 读取文件之前检查文件是否存在以避免异常?我尝试了下面的代码片段,但是当文件不存在时出现异常

from pyspark.sql import *
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
try:
    df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter",",").options(header='true', inferschema='true').load('/FileStore/tables/HealthCareSample_dumm.csv')
    print("File Exists")
except IOError:
    print("file not found")`

当我有文件时,它读取文件并 "prints File Exists" 但是当文件不存在时它会抛出 "AnalysisException: 'Path does not exist: dbfs:/FileStore/tables/HealthCareSample_dumm.csv;'"

看来您应该将 except IOError: 更改为 except AnalysisException:

Spark 在很多情况下抛出的 errors/exception 与常规 python 不同。它在读取文件时不会执行典型的 python io 操作,因此抛出不同的异常是有意义的。

很高兴在 Whosebug 上见到你。

我支持 dijksterhuis 的解决方案,但有一个例外 - Analysis Exception是Spark中非常普遍的异常,可能由于多种原因导致,而不仅仅是由于文件丢失。

如果要检查文件是否存在,则需要绕过Spark的FS抽象,直接访问存储系统(无论是s3,posix,还是其他)。这个解决方案的缺点是缺乏抽象——一旦你改变了你的底层 FS,你也需要改变你的代码。

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))

感谢@Dror 和@Kini。我运行 spark on cluster,必须加sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),这里s3是你集群文件系统的前缀

  def path_exists(path):
    # spark is a SparkSession
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))

您可以验证文件是否存在,如下所示:

import os

if os.path.isfile('/path/file.csv'):
     print("File Exists")
     my_df = spark.read.load("/path/file.csv")
     ...
else:            
     print("File doesn't exists")
dbutils.fs.ls(file_location)

不要导入 dbutils。当您启动集群时它已经存在。

@rosefun 发布的答案对我有用,但我花了很多时间才让它起作用。因此,我将详细介绍该解决方案的工作原理以及您应该避免的情况。

def path_exists(path):
    # spark is a SparkSession
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))

功能相同,可以正常检查文件是否存在于您提供的 S3 存储桶路径中。

您将必须根据您为此函数指定路径值的方式更改此函数。

path = f"s3://bucket-name/import/data/"
pathexists = path_exists(path)

如果您定义的路径变量在路径中有 s3 前缀,那么它将起作用。

此外,拆分字符串的代码部分只为您提供存储桶名称,如下所示:

path.split("/")[2] will give you `bucket-name`

但是如果路径中没有 s3 前缀,那么您将不得不通过更改一些代码来使用该函数,如下所示:

def path_exists(path):
   # spark is a SparkSession
   sc = spark.sparkContext
   fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path),
        sc._jsc.hadoopConfiguration(),
   )
   return fs.exists(sc._jvm.org.apache.hadoop.fs.Path("s3://" + path))