在 Spark 中高效聚合多个 CSV

Efficiently Aggregate Many CSVs in Spark

请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。

我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果可以简化操作,我可以在本地下载它们。我的目标是尽可能高效地做到这一点。当我的几十个 Spark worker 闲置时,让一些单线程 master 下载和解析一堆 CSV 文件似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。

CSV 文件的目录结构如下所示:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...

我有两年的数据,每天都有目录,每个里面有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 出错了,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某处的日志中通知我发生了这种情况,就可以跳过这些文件。

好像我想到的每一个Spark项目都是这个形式,不知道怎么解决。 (例如,尝试读取一堆制表符分隔的 weather data,或读取一堆日志文件来查看这些文件。)

我试过的

我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言;我对正确使用 idioms/tools 更感兴趣。

纯 Scala

我最初的想法是枚举和 parallelize 所有 year/mm-dd 组合的列表,这样我就可以让我的 Spark 工作人员每天独立处理(下载并解析所有 CSV 文件,然后堆栈它们彼此重叠 (unionAll()) 以减少它们)。不幸的是,使用 spark-csv library can only be done in the "parent"/master job, and not from each child as Spark doesn't allow job nesting 下载和解析 CSV 文件。所以只要我想使用 Spark 库来执行 importing/parsing.

就不会起作用

混合语言

当然,您可以使用该语言的本机 CSV 解析来读取每个文件,然后 "upload" 将它们发送到 Spark。在 R 中,这是一些包的组合,用于从 S3 中获取文件,然后是 read.csv,并以 createDataFrame() 结束以将数据导入 Spark。不幸的是,这真的很慢,而且似乎也与我希望 Spark 的工作方式背道而驰。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?

Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp

我开始研究这些量身定制的工具,但很快就不知所措了。我的理解是 many/all 这些工具可用于将我的 CSV 文件从 S3 导入 HDFS。

当然,从 HDFS 读取我的 CSV 文件比从 S3 读取要快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 文件需要解析,但我不知道在 Spark 中有一种分布式的方式来解析。

所以现在 (Spark 1.4) SparkR 支持 jsonparquet 文件结构。可以解析 Csv 文件,但随后需要使用额外的 jar 启动 spark 上下文(需要下载并放置在适当的文件夹中,我自己从未这样做过,但我的同事有)。

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

the docs 中有更多信息。我希望更新的 spark 版本对此有更多支持。

如果您不这样做,您将需要求助于不同的文件结构或使用 python 将所有文件从 .csv 转换为 .parquet。这是最近一次 python 演讲的片段。

data = sc.textFile(s3_paths, 1200).cache()

def caster(x):
    return Row(colname1 = x[0], colname2 = x[1])

df_rdd = data\
    .map(lambda x: x.split(','))\
    .map(caster)

ddf = sqlContext.inferSchema(df_rdd).cache()

ddf.write.save('s3n://<bucket>/<filename>.parquet')

另外,你的数据集有多大?您甚至可能不需要 spark 来进行分析。请注意,也是截至目前;

  • SparkR 仅支持 DataFrame。
  • 还没有分布式机器学习。
  • 为了可视化,如果您想使用像 ggplot2 这样的库,您需要将分布式数据帧转换回普通数据帧。
  • 如果你的数据集不超过几千兆字节,那么学习 spark 的额外麻烦可能还不值得
  • 现在还不多,但你可以期待更多的未来

我之前 运行 遇到过这个问题(但是我读取了大量的 Parquet 文件),我的建议是避免使用数据帧并使用 RDD。

常用成语是:

  1. 读入文件列表,每个文件占一行(在驱动程序中)。这里的预期输出是一个字符串列表
  2. 并行化字符串列表并使用客户 csv 映射它们 reader。 return 是案例列表 类.

如果在一天结束时您想要一个像 List[weather_data] 这样的数据结构,并且可以重写为 parquet 或数据库,您也可以使用 flatMap。