使用hadoop parquet处理大数据到CSV输出
Process big data using hadoop parquet to CSV output
我有 3 个数据集,我想加入并分组它们以获得包含聚合数据的 CSV。
数据作为 parquet 文件存储在 Hadoop 中,我使用 Zeppelin 到 运行 Apache Spark+Scala 进行数据处理。
我的数据集如下所示:
user_actions.show(10)
user_clicks.show(10)
user_options.show(10)
+--------------------+--------------------+
| id| keyword|
+--------------------+--------------------+
|00000000000000000001| aaaa1|
|00000000000000000002| aaaa1|
|00000000000000000003| aaaa2|
|00000000000000000004| aaaa2|
|00000000000000000005| aaaa0|
|00000000000000000006| aaaa4|
|00000000000000000007| aaaa1|
|00000000000000000008| aaaa2|
|00000000000000000009| aaaa1|
|00000000000000000010| aaaa1|
+--------------------+--------------------+
+--------------------+-------------------+
| search_id| selected_user_id|
+--------------------+-------------------+
|00000000000000000001| 1234|
|00000000000000000002| 1234|
|00000000000000000003| 1234|
|00000000000000000004| 1234|
+--------------------+-------------------+
+--------------------+----------+----------+
| search_id| user_id| position|
+--------------------+----------+----------+
|00000000000000000001| 1230| 1|
|00000000000000000001| 1234| 3|
|00000000000000000001| 1232| 2|
|00000000000000000002| 1231| 1|
|00000000000000000002| 1232| 2|
|00000000000000000002| 1233| 3|
|00000000000000000002| 1234| 4|
|00000000000000000003| 1234| 1|
|00000000000000000004| 1230| 1|
|00000000000000000004| 1234| 2|
+--------------------+----------+----------+
我想要实现的是为每个用户 ID 获取一个 JSON 和关键字,因为我需要将它们导入 MySQL 并将 user_id 作为 PK。
user_id,keywords
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}"
如果 JSON 不是现成的,我可以使用元组或任何字符串:
user_id,keywords
1234,"(aaaa1,0.58333),(aaaa2,1.5)"
到目前为止我所做的是:
val user_actions_data = user_actions
.join(user_options, user_options("search_id") === user_actions("id"))
val user_actions_full_data = user_actions_data
.join(
user_clicks,
user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"),
"left_outer"
)
val user_actions_data_groupped = user_actions_full_data
.groupBy("user_id", "search")
.agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg")
def udfScoreForUser = ((position: Double, searches: Long) => ( position/searches ))
val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey()
val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("overwrite")
.save("hdfs:///Search_log_testing_keywords/")
虽然这对小数据集按预期工作,但我的输出 CSV 文件是:
user_id,keywords
1234,"(aaaa1,0.58333), (aaaa2,0.5)"
我在 运行针对 200+GB 的数据时遇到问题。
我是 Spark&Scala 的新手,但我想我遗漏了一些东西,我不应该使用 DF 到 rdd,收集到数组上的映射,然后将它并行化回 DF 以将其导出到 CSV。
总而言之,我想对所有关键字应用评分并按用户 ID 对它们进行分组并将其保存到 CSV 文件中。到目前为止我所做的工作适用于一个小数据集,但是当我将它应用于 200GB 以上的数据时,apache spark 失败了。
是的,任何依赖于 Spark 中 collect
的东西通常都是错误的——除非您正在调试某些东西。当您调用 collect
时,所有数据都在数组中的驱动程序中收集,因此对于大多数大数据集,这甚至不是一个选项 - 您的驱动程序将抛出 OOM 并死掉。
我不明白的是,你当初为什么要收藏?为什么不简单地在分布式数据集上映射?
search_log_keywords
.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
.toDF("user_id","keywords")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("overwrite")
.save("hdfs:///Search_log_testing_keywords/")
这样一来,一切都是并行进行的。
关于 dataframes
和 rdds
之间的切换,那么我现在不会太担心。我知道社区大多提倡使用 dataframes
,但根据 Spark 的版本和您的用例,rdds
可能是更好的选择。
HDFS 的主要目标是将文件拆分成块并冗余存储。除非你绝对需要一个大文件,否则最好将分区的数据存储在HDFS中。
我有 3 个数据集,我想加入并分组它们以获得包含聚合数据的 CSV。
数据作为 parquet 文件存储在 Hadoop 中,我使用 Zeppelin 到 运行 Apache Spark+Scala 进行数据处理。
我的数据集如下所示:
user_actions.show(10)
user_clicks.show(10)
user_options.show(10)
+--------------------+--------------------+
| id| keyword|
+--------------------+--------------------+
|00000000000000000001| aaaa1|
|00000000000000000002| aaaa1|
|00000000000000000003| aaaa2|
|00000000000000000004| aaaa2|
|00000000000000000005| aaaa0|
|00000000000000000006| aaaa4|
|00000000000000000007| aaaa1|
|00000000000000000008| aaaa2|
|00000000000000000009| aaaa1|
|00000000000000000010| aaaa1|
+--------------------+--------------------+
+--------------------+-------------------+
| search_id| selected_user_id|
+--------------------+-------------------+
|00000000000000000001| 1234|
|00000000000000000002| 1234|
|00000000000000000003| 1234|
|00000000000000000004| 1234|
+--------------------+-------------------+
+--------------------+----------+----------+
| search_id| user_id| position|
+--------------------+----------+----------+
|00000000000000000001| 1230| 1|
|00000000000000000001| 1234| 3|
|00000000000000000001| 1232| 2|
|00000000000000000002| 1231| 1|
|00000000000000000002| 1232| 2|
|00000000000000000002| 1233| 3|
|00000000000000000002| 1234| 4|
|00000000000000000003| 1234| 1|
|00000000000000000004| 1230| 1|
|00000000000000000004| 1234| 2|
+--------------------+----------+----------+
我想要实现的是为每个用户 ID 获取一个 JSON 和关键字,因为我需要将它们导入 MySQL 并将 user_id 作为 PK。
user_id,keywords
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}"
如果 JSON 不是现成的,我可以使用元组或任何字符串:
user_id,keywords
1234,"(aaaa1,0.58333),(aaaa2,1.5)"
到目前为止我所做的是:
val user_actions_data = user_actions
.join(user_options, user_options("search_id") === user_actions("id"))
val user_actions_full_data = user_actions_data
.join(
user_clicks,
user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"),
"left_outer"
)
val user_actions_data_groupped = user_actions_full_data
.groupBy("user_id", "search")
.agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg")
def udfScoreForUser = ((position: Double, searches: Long) => ( position/searches ))
val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey()
val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("overwrite")
.save("hdfs:///Search_log_testing_keywords/")
虽然这对小数据集按预期工作,但我的输出 CSV 文件是:
user_id,keywords
1234,"(aaaa1,0.58333), (aaaa2,0.5)"
我在 运行针对 200+GB 的数据时遇到问题。
我是 Spark&Scala 的新手,但我想我遗漏了一些东西,我不应该使用 DF 到 rdd,收集到数组上的映射,然后将它并行化回 DF 以将其导出到 CSV。
总而言之,我想对所有关键字应用评分并按用户 ID 对它们进行分组并将其保存到 CSV 文件中。到目前为止我所做的工作适用于一个小数据集,但是当我将它应用于 200GB 以上的数据时,apache spark 失败了。
是的,任何依赖于 Spark 中 collect
的东西通常都是错误的——除非您正在调试某些东西。当您调用 collect
时,所有数据都在数组中的驱动程序中收集,因此对于大多数大数据集,这甚至不是一个选项 - 您的驱动程序将抛出 OOM 并死掉。
我不明白的是,你当初为什么要收藏?为什么不简单地在分布式数据集上映射?
search_log_keywords
.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
.toDF("user_id","keywords")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("overwrite")
.save("hdfs:///Search_log_testing_keywords/")
这样一来,一切都是并行进行的。
关于 dataframes
和 rdds
之间的切换,那么我现在不会太担心。我知道社区大多提倡使用 dataframes
,但根据 Spark 的版本和您的用例,rdds
可能是更好的选择。
HDFS 的主要目标是将文件拆分成块并冗余存储。除非你绝对需要一个大文件,否则最好将分区的数据存储在HDFS中。