Pyspark 将文件保存为 parquet 并读取
Pyspark save file as parquet and read
我的 PySpark
脚本将创建的 DataFrame
保存到目录:
df.write.save(full_path, format=file_format, mode=options['mode'])
如果我在同一个 运行 中读取此文件,一切都很好:
return sqlContext.read.format(file_format).load(full_path)
但是,当我尝试在另一个脚本中从该目录读取文件时 运行 我收到错误消息:
java.io.FileNotFoundException: File does not exist: /hadoop/log_files/some_data.json/part-00000-26c649cb-0c0f-421f-b04a-9d6a81bb6767.json
我知道我可以通过 Spark 的提示找到解决方法:
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
但是我想知道我失败的原因,这种问题的正统方法是什么?
您正在尝试管理与同一个文件相关的两个对象,因此涉及此对象的缓存会给您带来问题,它们都针对同一个文件。这里有一个简单的解决方案,
我的 PySpark
脚本将创建的 DataFrame
保存到目录:
df.write.save(full_path, format=file_format, mode=options['mode'])
如果我在同一个 运行 中读取此文件,一切都很好:
return sqlContext.read.format(file_format).load(full_path)
但是,当我尝试在另一个脚本中从该目录读取文件时 运行 我收到错误消息:
java.io.FileNotFoundException: File does not exist: /hadoop/log_files/some_data.json/part-00000-26c649cb-0c0f-421f-b04a-9d6a81bb6767.json
我知道我可以通过 Spark 的提示找到解决方法:
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
但是我想知道我失败的原因,这种问题的正统方法是什么?
您正在尝试管理与同一个文件相关的两个对象,因此涉及此对象的缓存会给您带来问题,它们都针对同一个文件。这里有一个简单的解决方案,