如何让 spark 2.0 像 csv 一样读取 mutli 文件夹镶木地板

how to let spark 2.0 reading mutli folders parquet like csv

我有一些日常数据要保存到多个文件夹(主要是基于时间)。现在我有两种格式来存储文件,一种是 parquet 另一种是 csv ,我想保存为 parquet 格式以节省一些 space。 文件夹结构如下:

[root@hdp raw]# tree
.
├── entityid=10001
│   └── year=2017
│       └── quarter=1
│           └── month=1
│               ├── day=6
│               │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│               └── day=7
│                   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100055
│   └── year=2017
│       └── quarter=1
│           └── month=1
│               ├── day=6
│               │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│               └── day=7
│                   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
├── entityid=100082
│   └── year=2017
│       └── quarter=1
│           └── month=1
│               ├── day=6
│               │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
│               └── day=7
│                   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
└── entityid=10012
    └── year=2017
        └── quarter=1
            └── month=1
                ├── day=6
                │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet
                └── day=7
                    └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

现在我有一个python列表存储所有需要读取的文件夹,假设每次运行它需要根据过滤条件只读取部分文件夹

folderList=df_inc.collect()
folderString=[]

for x in folderList:
    folderString.append(x.folders)
In [44]: folderString
Out[44]:
[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7',
 u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6',
 u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7',
 u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6',
 u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6',
 u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7']

文件的作者:

df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix)

当我尝试读取存储在 folderString 中的文件夹时 df_batch=spark.read.parquet(folderString) 错误 java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String encounters.

如果我以 csv 格式保存文件并通过下面的代码读取它,它就可以正常工作,如下所示:无论如何请阅读 parquet 文件夹的文件列表,非常感谢!

In [46]: folderList=df_inc.collect()
    ...: folderString=[]
    ...:
    ...: for x in folderList:
    ...:     folderString.append(x.folders)
    ...: df_batch=spark.read.csv(folderString)
    ...:

In [47]: df_batch.show()
+------------+---+-------------------+----------+----------+
|         _c0|_c1|                _c2|       _c3|       _c4|
+------------+---+-------------------+----------+----------+
|6C25B9C3DD54|  1|2017-01-07 00:00:01|1483718401|1483718400|
|38BC1ADB0164|  3|2017-01-06 00:00:01|1483632001|1483632000|
|38BC1ADB0164|  3|2017-01-07 00:00:01|1483718401|1483718400|

我解决了这个问题:

df=spark.read.parquet(folderString[0])
y=0
for x in folderString:
    if y>0:
        df=df.union(spark.read.parquet(x))
    y=y+1

这是一个非常丑陋的解决方案,如果您有好的想法,请告诉我。非常感谢。

几天后,找到了解决问题的完美方法:

df=spark.read.parquet(*folderString)

您对 Hadoop 和 Parquet 中的分区有误解。

看,我有一个按年-月分区的简单文件结构。是这样的:

my_folder
.
├── year-month=2016-12
|   └── my_files.parquet
├── year-month=2016-11
|   └── my_files.parquet

如果我从 my_folder 读取数据帧 reader 中没有任何过滤器,如下所示:

df = saprk.read.parquet("path/to/my_folder")
df.show()

如果您检查 Spark DAG 可视化,您会发现在这种情况下它将读取我所有的分区,如您所说:

在上面的例子中,第一个方块中的每个点都是我数据的一个分区。

但是如果我将代码更改为:

df = saprk.read.parquet("path/to/my_folder")\
          .filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) &
                  (col('year-month') <= lit(my_date.strftime('%Y-%m'))))

DAG 可视化将显示我正在使用的分区数量:

因此,如果您按分区列进行过滤,您将不会读取所有文件。正是你需要的,你不需要使用一个文件夹一个文件夹读取的解决方案。