为什么 pyspark 无法读取此 csv 文件?
Why is pyspark unable to read this csv file?
我无法在众多 Stack Overflow 类似问题“如何将 csv 读入 pyspark 数据帧?”中找到这个问题。 (请参阅末尾听起来相似但不同问题的列表)。
有问题的 CSV 文件位于集群驱动程序的 tmp 目录中,请注意,此 csv 文件有意不在 Databricks DBFS 云存储中。使用 DBFS 不适用于导致此问题的用例。
请注意,我正在尝试使用 Spark 3.2.1 和 Scala 2.12 在 Databricks runtime 10.3 上运行此功能。
y_header = ['fruit','color','size','note']
y = [('apple','red','medium','juicy')]
y.append(('grape','purple','small','fresh'))
import csv
with (open('/tmp/test.csv','w')) as f:
w = csv.writer(f)
w.writerow(y_header)
w.writerows(y)
然后使用python os验证文件是否已创建:
import os
list(filter(lambda f: f == 'test.csv',os.listdir('/tmp/')))
现在验证 databricks Spark API 可以看到文件,必须使用 file:///
dbutils.fs.ls('file:///tmp/test.csv')
现在,可选步骤,为 Spark 指定数据帧架构以应用于 csv 文件:
from pyspark.sql.types import *
csv_schema = StructType([StructField('fruit', StringType()), StructField('color', StringType()), StructField('size', StringType()), StructField('note', StringType())])
现在定义 PySpark 数据框:
x = spark.read.csv('file:///tmp/test.csv',header=True,schema=csv_schema)
上面一行运行没有错误,但是请记住,由于惰性执行,spark 引擎仍然没有读取文件。所以接下来我们将给 Spark 一个强制它执行数据帧的命令:
display(x)
错误是:
FileReadException:读取文件文件时出错:/tmp/test.csv。 pos 基础文件已更新。您可以通过 SQL 中的 运行 'REFRESH TABLE tableName' 命令或通过重新创建所涉及的 Dataset/DataFrame 显式使 Spark 中的缓存无效。如果 Delta 缓存过时或底层文件已被删除,您可以通过重启集群手动使 Delta 缓存失效。
原因:FileNotFoundException:文件文件:/tmp/test.csv 不存在。 . .
并深入研究我发现的错误:java.io.FileNotFoundException:文件文件:/tmp/test.csv 不存在。而且我已经尝试过重启集群,重启并没有清除错误。
但我可以证明该文件确实存在,只是由于某种原因 Spark 和 Java 无法访问它,因为我可以用 pandas 读取同一个文件没问题:
import pandas as p
p.read_csv('/tmp/test.csv')
那么如何让 spark 读取这个 csv 文件?
附录 - 我搜索过但没有回答我的问题的类似 spark read csv 问题列表:1 2 4 6 7 8
我猜 databricks 文件加载器似乎无法识别绝对路径 /tmp/
。
您可以尝试以下解决方法。
- 使用 Pandas Dataframe
使用路径读取文件
- 使用
CreateDataFrame
函数将pandas数据帧传递给Spark
代码:
df_pd = pd.read_csv('File:///tmp/test.csv')
sparkDF=spark.createDataFrame(df_pd)
sparkDF.display()
输出 :
我通过电子邮件联系了一位 Databricks 架构师,他确认 Databricks 只能在单节点设置中本地(从集群)读取。
因此 DBFS 是包含 >1 个节点的典型集群中随机 writing/reading 文本数据文件的唯一选择。
我无法在众多 Stack Overflow 类似问题“如何将 csv 读入 pyspark 数据帧?”中找到这个问题。 (请参阅末尾听起来相似但不同问题的列表)。
有问题的 CSV 文件位于集群驱动程序的 tmp 目录中,请注意,此 csv 文件有意不在 Databricks DBFS 云存储中。使用 DBFS 不适用于导致此问题的用例。
请注意,我正在尝试使用 Spark 3.2.1 和 Scala 2.12 在 Databricks runtime 10.3 上运行此功能。
y_header = ['fruit','color','size','note']
y = [('apple','red','medium','juicy')]
y.append(('grape','purple','small','fresh'))
import csv
with (open('/tmp/test.csv','w')) as f:
w = csv.writer(f)
w.writerow(y_header)
w.writerows(y)
然后使用python os验证文件是否已创建:
import os
list(filter(lambda f: f == 'test.csv',os.listdir('/tmp/')))
现在验证 databricks Spark API 可以看到文件,必须使用 file:///
dbutils.fs.ls('file:///tmp/test.csv')
现在,可选步骤,为 Spark 指定数据帧架构以应用于 csv 文件:
from pyspark.sql.types import *
csv_schema = StructType([StructField('fruit', StringType()), StructField('color', StringType()), StructField('size', StringType()), StructField('note', StringType())])
现在定义 PySpark 数据框:
x = spark.read.csv('file:///tmp/test.csv',header=True,schema=csv_schema)
上面一行运行没有错误,但是请记住,由于惰性执行,spark 引擎仍然没有读取文件。所以接下来我们将给 Spark 一个强制它执行数据帧的命令:
display(x)
错误是: FileReadException:读取文件文件时出错:/tmp/test.csv。 pos 基础文件已更新。您可以通过 SQL 中的 运行 'REFRESH TABLE tableName' 命令或通过重新创建所涉及的 Dataset/DataFrame 显式使 Spark 中的缓存无效。如果 Delta 缓存过时或底层文件已被删除,您可以通过重启集群手动使 Delta 缓存失效。 原因:FileNotFoundException:文件文件:/tmp/test.csv 不存在。 . .
并深入研究我发现的错误:java.io.FileNotFoundException:文件文件:/tmp/test.csv 不存在。而且我已经尝试过重启集群,重启并没有清除错误。
但我可以证明该文件确实存在,只是由于某种原因 Spark 和 Java 无法访问它,因为我可以用 pandas 读取同一个文件没问题:
import pandas as p
p.read_csv('/tmp/test.csv')
那么如何让 spark 读取这个 csv 文件?
附录 - 我搜索过但没有回答我的问题的类似 spark read csv 问题列表:1 2
我猜 databricks 文件加载器似乎无法识别绝对路径 /tmp/
。
您可以尝试以下解决方法。
- 使用 Pandas Dataframe 使用路径读取文件
- 使用
CreateDataFrame
函数将pandas数据帧传递给Spark
代码:
df_pd = pd.read_csv('File:///tmp/test.csv')
sparkDF=spark.createDataFrame(df_pd)
sparkDF.display()
输出 :
我通过电子邮件联系了一位 Databricks 架构师,他确认 Databricks 只能在单节点设置中本地(从集群)读取。
因此 DBFS 是包含 >1 个节点的典型集群中随机 writing/reading 文本数据文件的唯一选择。