Pyspark - 加载文件:路径不存在
Pyspark - Load file: Path does not exist
我是 Spark 的新手。我正在尝试读取 EMR 集群中的本地 csv 文件。该文件位于:/home/hadoop/。我使用的脚本是这个:
spark = SparkSession \
.builder \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()\
df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)
当我 运行 脚本引发以下错误消息时:
pyspark.sql.utils.AnalysisException: u'Path does not exist:
hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv
然后,我发现我必须在文件路径中添加 file:// 才能读取本地文件:
df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)
但这一次,上述方法引发了不同的错误:
Lost task 0.3 in stage 0.0 (TID 3,
ip-172-31-41-81.eu-west-1.compute.internal, executor 1):
java.io.FileNotFoundException: File
file:/home/hadoop/observations_temp.csv does not exist
我认为是因为 file// 扩展只是在本地读取文件,它不会将文件分发到其他节点。
你知道我如何读取 csv 文件并使其对所有其他节点可用吗?
您的文件从您的工作节点中丢失这一事实是正确的,因此会引发您遇到的错误。
这里是官方文档Ref. External Datasets。
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
所以基本上你有两个解决方案:
您在开始工作之前将您的文件复制到每个工作人员中;
或者您将上传到 HDFS 中:(推荐的解决方案)
hadoop fs -put localfile /user/hadoop/hadoopfile.csv
现在您可以阅读:
df = spark.read.csv('/user/hadoop/hadoopfile.csv', header=True)
看来您也在使用AWS S3。您始终可以尝试直接从 S3 读取它而无需下载它。 (当然有适当的凭据)
有人建议使用 spark-submit 提供的 --files 标签将文件上传到执行目录。我不推荐这种方法,除非你的 csv 文件非常小,但你不需要 Spark。
或者,我会坚持使用 HDFS(或任何分布式文件系统)。
我认为你缺少的是在初始化 SparkSession 时明确设置主节点,试试这样的事情
spark = SparkSession \
.builder \
.master("local") \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
然后以与之前相同的方式读取文件
df = spark.read.csv('file:///home/hadoop/observations_temp.csv')
这应该可以解决问题...
可能对某人 运行ning zeppelin on mac using Docker 有用。
将文件复制到自定义文件夹:/Users/my_user/zeppspark/myjson.txt
docker 运行 -p 8080:8080 -v /Users/my_user/zeppspark:/zeppelin/notebook --rm --name zeppelin apache/zeppelin:0.9.0
在 Zeppelin 上,您可以运行获取您的文件:
%pyspark
json_data = sc.textFile('/zeppelin/notebook/myjson.txt')
我是 Spark 的新手。我正在尝试读取 EMR 集群中的本地 csv 文件。该文件位于:/home/hadoop/。我使用的脚本是这个:
spark = SparkSession \
.builder \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()\
df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)
当我 运行 脚本引发以下错误消息时:
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv
然后,我发现我必须在文件路径中添加 file:// 才能读取本地文件:
df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)
但这一次,上述方法引发了不同的错误:
Lost task 0.3 in stage 0.0 (TID 3,
ip-172-31-41-81.eu-west-1.compute.internal, executor 1): java.io.FileNotFoundException: File file:/home/hadoop/observations_temp.csv does not exist
我认为是因为 file// 扩展只是在本地读取文件,它不会将文件分发到其他节点。
你知道我如何读取 csv 文件并使其对所有其他节点可用吗?
您的文件从您的工作节点中丢失这一事实是正确的,因此会引发您遇到的错误。
这里是官方文档Ref. External Datasets。
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
所以基本上你有两个解决方案:
您在开始工作之前将您的文件复制到每个工作人员中;
或者您将上传到 HDFS 中:(推荐的解决方案)
hadoop fs -put localfile /user/hadoop/hadoopfile.csv
现在您可以阅读:
df = spark.read.csv('/user/hadoop/hadoopfile.csv', header=True)
看来您也在使用AWS S3。您始终可以尝试直接从 S3 读取它而无需下载它。 (当然有适当的凭据)
有人建议使用 spark-submit 提供的 --files 标签将文件上传到执行目录。我不推荐这种方法,除非你的 csv 文件非常小,但你不需要 Spark。
或者,我会坚持使用 HDFS(或任何分布式文件系统)。
我认为你缺少的是在初始化 SparkSession 时明确设置主节点,试试这样的事情
spark = SparkSession \
.builder \
.master("local") \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
然后以与之前相同的方式读取文件
df = spark.read.csv('file:///home/hadoop/observations_temp.csv')
这应该可以解决问题...
可能对某人 运行ning zeppelin on mac using Docker 有用。
将文件复制到自定义文件夹:/Users/my_user/zeppspark/myjson.txt
docker 运行 -p 8080:8080 -v /Users/my_user/zeppspark:/zeppelin/notebook --rm --name zeppelin apache/zeppelin:0.9.0
在 Zeppelin 上,您可以运行获取您的文件:
%pyspark
json_data = sc.textFile('/zeppelin/notebook/myjson.txt')