在 Spark 中高效聚合多个 CSV
Efficiently Aggregate Many CSVs in Spark
请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。
我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果可以简化操作,我可以在本地下载它们。我的目标是尽可能高效地做到这一点。当我的几十个 Spark worker 闲置时,让一些单线程 master 下载和解析一堆 CSV 文件似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。
CSV 文件的目录结构如下所示:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
我有两年的数据,每天都有目录,每个里面有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 出错了,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某处的日志中通知我发生了这种情况,就可以跳过这些文件。
好像我想到的每一个Spark项目都是这个形式,不知道怎么解决。 (例如,尝试读取一堆制表符分隔的 weather data,或读取一堆日志文件来查看这些文件。)
我试过的
我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言;我对正确使用 idioms/tools 更感兴趣。
纯 Scala
我最初的想法是枚举和 parallelize
所有 year/mm-dd
组合的列表,这样我就可以让我的 Spark 工作人员每天独立处理(下载并解析所有 CSV 文件,然后堆栈它们彼此重叠 (unionAll()
) 以减少它们)。不幸的是,使用 spark-csv library can only be done in the "parent"/master job, and not from each child as Spark doesn't allow job nesting 下载和解析 CSV 文件。所以只要我想使用 Spark 库来执行 importing/parsing.
就不会起作用
混合语言
当然,您可以使用该语言的本机 CSV 解析来读取每个文件,然后 "upload" 将它们发送到 Spark。在 R 中,这是一些包的组合,用于从 S3 中获取文件,然后是 read.csv
,并以 createDataFrame()
结束以将数据导入 Spark。不幸的是,这真的很慢,而且似乎也与我希望 Spark 的工作方式背道而驰。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?
Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp
我开始研究这些量身定制的工具,但很快就不知所措了。我的理解是 many/all 这些工具可用于将我的 CSV 文件从 S3 导入 HDFS。
当然,从 HDFS 读取我的 CSV 文件比从 S3 读取要快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 文件需要解析,但我不知道在 Spark 中有一种分布式的方式来解析。
所以现在 (Spark 1.4) SparkR 支持 json
或 parquet
文件结构。可以解析 Csv 文件,但随后需要使用额外的 jar 启动 spark 上下文(需要下载并放置在适当的文件夹中,我自己从未这样做过,但我的同事有)。
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
the docs 中有更多信息。我希望更新的 spark 版本对此有更多支持。
如果您不这样做,您将需要求助于不同的文件结构或使用 python 将所有文件从 .csv
转换为 .parquet
。这是最近一次 python 演讲的片段。
data = sc.textFile(s3_paths, 1200).cache()
def caster(x):
return Row(colname1 = x[0], colname2 = x[1])
df_rdd = data\
.map(lambda x: x.split(','))\
.map(caster)
ddf = sqlContext.inferSchema(df_rdd).cache()
ddf.write.save('s3n://<bucket>/<filename>.parquet')
另外,你的数据集有多大?您甚至可能不需要 spark 来进行分析。请注意,也是截至目前;
- SparkR 仅支持 DataFrame。
- 还没有分布式机器学习。
- 为了可视化,如果您想使用像
ggplot2
这样的库,您需要将分布式数据帧转换回普通数据帧。
- 如果你的数据集不超过几千兆字节,那么学习 spark 的额外麻烦可能还不值得
- 现在还不多,但你可以期待更多的未来
我之前 运行 遇到过这个问题(但是我读取了大量的 Parquet 文件),我的建议是避免使用数据帧并使用 RDD。
常用成语是:
- 读入文件列表,每个文件占一行(在驱动程序中)。这里的预期输出是一个字符串列表
- 并行化字符串列表并使用客户 csv 映射它们 reader。 return 是案例列表 类.
如果在一天结束时您想要一个像 List[weather_data] 这样的数据结构,并且可以重写为 parquet 或数据库,您也可以使用 flatMap。
请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。
我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果可以简化操作,我可以在本地下载它们。我的目标是尽可能高效地做到这一点。当我的几十个 Spark worker 闲置时,让一些单线程 master 下载和解析一堆 CSV 文件似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。
CSV 文件的目录结构如下所示:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
我有两年的数据,每天都有目录,每个里面有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 出错了,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某处的日志中通知我发生了这种情况,就可以跳过这些文件。
好像我想到的每一个Spark项目都是这个形式,不知道怎么解决。 (例如,尝试读取一堆制表符分隔的 weather data,或读取一堆日志文件来查看这些文件。)
我试过的
我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言;我对正确使用 idioms/tools 更感兴趣。
纯 Scala
我最初的想法是枚举和 parallelize
所有 year/mm-dd
组合的列表,这样我就可以让我的 Spark 工作人员每天独立处理(下载并解析所有 CSV 文件,然后堆栈它们彼此重叠 (unionAll()
) 以减少它们)。不幸的是,使用 spark-csv library can only be done in the "parent"/master job, and not from each child as Spark doesn't allow job nesting 下载和解析 CSV 文件。所以只要我想使用 Spark 库来执行 importing/parsing.
混合语言
当然,您可以使用该语言的本机 CSV 解析来读取每个文件,然后 "upload" 将它们发送到 Spark。在 R 中,这是一些包的组合,用于从 S3 中获取文件,然后是 read.csv
,并以 createDataFrame()
结束以将数据导入 Spark。不幸的是,这真的很慢,而且似乎也与我希望 Spark 的工作方式背道而驰。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?
Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp
我开始研究这些量身定制的工具,但很快就不知所措了。我的理解是 many/all 这些工具可用于将我的 CSV 文件从 S3 导入 HDFS。
当然,从 HDFS 读取我的 CSV 文件比从 S3 读取要快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 文件需要解析,但我不知道在 Spark 中有一种分布式的方式来解析。
所以现在 (Spark 1.4) SparkR 支持 json
或 parquet
文件结构。可以解析 Csv 文件,但随后需要使用额外的 jar 启动 spark 上下文(需要下载并放置在适当的文件夹中,我自己从未这样做过,但我的同事有)。
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
the docs 中有更多信息。我希望更新的 spark 版本对此有更多支持。
如果您不这样做,您将需要求助于不同的文件结构或使用 python 将所有文件从 .csv
转换为 .parquet
。这是最近一次 python 演讲的片段。
data = sc.textFile(s3_paths, 1200).cache()
def caster(x):
return Row(colname1 = x[0], colname2 = x[1])
df_rdd = data\
.map(lambda x: x.split(','))\
.map(caster)
ddf = sqlContext.inferSchema(df_rdd).cache()
ddf.write.save('s3n://<bucket>/<filename>.parquet')
另外,你的数据集有多大?您甚至可能不需要 spark 来进行分析。请注意,也是截至目前;
- SparkR 仅支持 DataFrame。
- 还没有分布式机器学习。
- 为了可视化,如果您想使用像
ggplot2
这样的库,您需要将分布式数据帧转换回普通数据帧。 - 如果你的数据集不超过几千兆字节,那么学习 spark 的额外麻烦可能还不值得
- 现在还不多,但你可以期待更多的未来
我之前 运行 遇到过这个问题(但是我读取了大量的 Parquet 文件),我的建议是避免使用数据帧并使用 RDD。
常用成语是:
- 读入文件列表,每个文件占一行(在驱动程序中)。这里的预期输出是一个字符串列表
- 并行化字符串列表并使用客户 csv 映射它们 reader。 return 是案例列表 类.
如果在一天结束时您想要一个像 List[weather_data] 这样的数据结构,并且可以重写为 parquet 或数据库,您也可以使用 flatMap。