如何让 spark 2.0 像 csv 一样读取 mutli 文件夹镶木地板
how to let spark 2.0 reading mutli folders parquet like csv
我有一些日常数据要保存到多个文件夹(主要是基于时间)。现在我有两种格式来存储文件,一种是 parquet 另一种是 csv ,我想保存为 parquet 格式以节省一些 space。
文件夹结构如下:
[root@hdp raw]# tree
.
├── entityid=10001
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100055
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100082
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
└── entityid=10012
└── year=2017
└── quarter=1
└── month=1
├── day=6
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
└── day=7
└── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
现在我有一个python列表存储所有需要读取的文件夹,假设每次运行它需要根据过滤条件只读取部分文件夹
folderList=df_inc.collect()
folderString=[]
for x in folderList:
folderString.append(x.folders)
In [44]: folderString
Out[44]:
[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7',
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7',
u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7']
文件的作者:
df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix)
当我尝试读取存储在 folderString 中的文件夹时 df_batch=spark.read.parquet(folderString)
错误 java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String encounters.
如果我以 csv 格式保存文件并通过下面的代码读取它,它就可以正常工作,如下所示:无论如何请阅读 parquet 文件夹的文件列表,非常感谢!
In [46]: folderList=df_inc.collect()
...: folderString=[]
...:
...: for x in folderList:
...: folderString.append(x.folders)
...: df_batch=spark.read.csv(folderString)
...:
In [47]: df_batch.show()
+------------+---+-------------------+----------+----------+
| _c0|_c1| _c2| _c3| _c4|
+------------+---+-------------------+----------+----------+
|6C25B9C3DD54| 1|2017-01-07 00:00:01|1483718401|1483718400|
|38BC1ADB0164| 3|2017-01-06 00:00:01|1483632001|1483632000|
|38BC1ADB0164| 3|2017-01-07 00:00:01|1483718401|1483718400|
我解决了这个问题:
df=spark.read.parquet(folderString[0])
y=0
for x in folderString:
if y>0:
df=df.union(spark.read.parquet(x))
y=y+1
这是一个非常丑陋的解决方案,如果您有好的想法,请告诉我。非常感谢。
几天后,找到了解决问题的完美方法:
df=spark.read.parquet(*folderString)
您对 Hadoop 和 Parquet 中的分区有误解。
看,我有一个按年-月分区的简单文件结构。是这样的:
my_folder
.
├── year-month=2016-12
| └── my_files.parquet
├── year-month=2016-11
| └── my_files.parquet
如果我从 my_folder
读取数据帧 reader 中没有任何过滤器,如下所示:
df = saprk.read.parquet("path/to/my_folder")
df.show()
如果您检查 Spark DAG 可视化,您会发现在这种情况下它将读取我所有的分区,如您所说:
在上面的例子中,第一个方块中的每个点都是我数据的一个分区。
但是如果我将代码更改为:
df = saprk.read.parquet("path/to/my_folder")\
.filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) &
(col('year-month') <= lit(my_date.strftime('%Y-%m'))))
DAG 可视化将显示我正在使用的分区数量:
因此,如果您按分区列进行过滤,您将不会读取所有文件。正是你需要的,你不需要使用一个文件夹一个文件夹读取的解决方案。
我有一些日常数据要保存到多个文件夹(主要是基于时间)。现在我有两种格式来存储文件,一种是 parquet 另一种是 csv ,我想保存为 parquet 格式以节省一些 space。 文件夹结构如下:
[root@hdp raw]# tree
.
├── entityid=10001
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100055
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100082
│ └── year=2017
│ └── quarter=1
│ └── month=1
│ ├── day=6
│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│ └── day=7
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
└── entityid=10012
└── year=2017
└── quarter=1
└── month=1
├── day=6
│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
└── day=7
└── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
现在我有一个python列表存储所有需要读取的文件夹,假设每次运行它需要根据过滤条件只读取部分文件夹
folderList=df_inc.collect()
folderString=[]
for x in folderList:
folderString.append(x.folders)
In [44]: folderString
Out[44]:
[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7',
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7',
u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6',
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7']
文件的作者:
df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix)
当我尝试读取存储在 folderString 中的文件夹时 df_batch=spark.read.parquet(folderString)
错误 java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String encounters.
如果我以 csv 格式保存文件并通过下面的代码读取它,它就可以正常工作,如下所示:无论如何请阅读 parquet 文件夹的文件列表,非常感谢!
In [46]: folderList=df_inc.collect()
...: folderString=[]
...:
...: for x in folderList:
...: folderString.append(x.folders)
...: df_batch=spark.read.csv(folderString)
...:
In [47]: df_batch.show()
+------------+---+-------------------+----------+----------+
| _c0|_c1| _c2| _c3| _c4|
+------------+---+-------------------+----------+----------+
|6C25B9C3DD54| 1|2017-01-07 00:00:01|1483718401|1483718400|
|38BC1ADB0164| 3|2017-01-06 00:00:01|1483632001|1483632000|
|38BC1ADB0164| 3|2017-01-07 00:00:01|1483718401|1483718400|
我解决了这个问题:
df=spark.read.parquet(folderString[0])
y=0
for x in folderString:
if y>0:
df=df.union(spark.read.parquet(x))
y=y+1
这是一个非常丑陋的解决方案,如果您有好的想法,请告诉我。非常感谢。
几天后,找到了解决问题的完美方法:
df=spark.read.parquet(*folderString)
您对 Hadoop 和 Parquet 中的分区有误解。
看,我有一个按年-月分区的简单文件结构。是这样的:
my_folder
.
├── year-month=2016-12
| └── my_files.parquet
├── year-month=2016-11
| └── my_files.parquet
如果我从 my_folder
读取数据帧 reader 中没有任何过滤器,如下所示:
df = saprk.read.parquet("path/to/my_folder")
df.show()
如果您检查 Spark DAG 可视化,您会发现在这种情况下它将读取我所有的分区,如您所说:
在上面的例子中,第一个方块中的每个点都是我数据的一个分区。
但是如果我将代码更改为:
df = saprk.read.parquet("path/to/my_folder")\
.filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) &
(col('year-month') <= lit(my_date.strftime('%Y-%m'))))
DAG 可视化将显示我正在使用的分区数量:
因此,如果您按分区列进行过滤,您将不会读取所有文件。正是你需要的,你不需要使用一个文件夹一个文件夹读取的解决方案。