Spark 数据集转换为数组
Spark Data set transformation to array
我有如下数据集; col1 的值重复多次,col2 的值唯一。这个原始数据集几乎可以有 10 亿行,所以我不想使用 collect 或 collect_list,因为它不会针对我的用例进行横向扩展。
原始数据集:
+---------------------|
| col1 | col2 |
+---------------------|
| AA| 11 |
| BB| 21 |
| AA| 12 |
| AA| 13 |
| BB| 22 |
| CC| 33 |
+---------------------|
我想将数据集转换为以下数组格式。 newColumn 作为 col2 的数组。
转换后的数据集:
+---------------------|
|col1 | newColumn|
+---------------------|
| AA| [11,12,13]|
| BB| [21,22] |
| CC| [33] |
+---------------------|
我看过 解决方案,但它使用 collect_list 并且不会在大数据集上横向扩展。
- 加载你的数据框
- 分组依据
col1
- 使用
collect_list
将 col2
聚合到一个列表
import org.apache.spark.sql.functions
object GroupToArray {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
//Load your dataframe
val df = List(("AA", "11"),
("BB", "21"),
("AA", "12"),
("AA", "13"),
("BB", "22"),
("CC", "33")).toDF("col1","col2")
//Group by 'col1'
df.groupBy("col1")
//agregate on col2 and combine it to a list
.agg(functions.collect_list("col2").as("newColumn"))
.show()
}
}
使用spark的内置函数永远是最好的方法。
我认为使用 collect_list 函数没有问题。只要你有足够的内存,这将是最好的方法。
优化您的工作的一种方法是将您的数据保存为 parquet ,将其按 A 列存储并将其保存为 table。更好的办法是也按均匀分布数据的一些列对其进行分区。
例如,
df_stored = #load your data from csv or parquet or any format'
spark.catalog.setCurrentDatabase(database_name)
df_stored.write.mode("overwrite").format("parquet").partitionBy(part_col).bucketBy(10,"col1").option("path",savepath).saveAsTable(tablename)
df_analysis = spark.table(tablename)
df_aggreg = df_analysis.groupby('col1').agg(F.collect_list(col('col2')))
这将加快聚合速度并避免大量混洗。试试看
我有如下数据集; col1 的值重复多次,col2 的值唯一。这个原始数据集几乎可以有 10 亿行,所以我不想使用 collect 或 collect_list,因为它不会针对我的用例进行横向扩展。
原始数据集:
+---------------------|
| col1 | col2 |
+---------------------|
| AA| 11 |
| BB| 21 |
| AA| 12 |
| AA| 13 |
| BB| 22 |
| CC| 33 |
+---------------------|
我想将数据集转换为以下数组格式。 newColumn 作为 col2 的数组。
转换后的数据集:
+---------------------|
|col1 | newColumn|
+---------------------|
| AA| [11,12,13]|
| BB| [21,22] |
| CC| [33] |
+---------------------|
我看过
- 加载你的数据框
- 分组依据
col1
- 使用
collect_list
将
col2
聚合到一个列表
import org.apache.spark.sql.functions
object GroupToArray {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
//Load your dataframe
val df = List(("AA", "11"),
("BB", "21"),
("AA", "12"),
("AA", "13"),
("BB", "22"),
("CC", "33")).toDF("col1","col2")
//Group by 'col1'
df.groupBy("col1")
//agregate on col2 and combine it to a list
.agg(functions.collect_list("col2").as("newColumn"))
.show()
}
}
使用spark的内置函数永远是最好的方法。 我认为使用 collect_list 函数没有问题。只要你有足够的内存,这将是最好的方法。 优化您的工作的一种方法是将您的数据保存为 parquet ,将其按 A 列存储并将其保存为 table。更好的办法是也按均匀分布数据的一些列对其进行分区。
例如,
df_stored = #load your data from csv or parquet or any format'
spark.catalog.setCurrentDatabase(database_name)
df_stored.write.mode("overwrite").format("parquet").partitionBy(part_col).bucketBy(10,"col1").option("path",savepath).saveAsTable(tablename)
df_analysis = spark.table(tablename)
df_aggreg = df_analysis.groupby('col1').agg(F.collect_list(col('col2')))
这将加快聚合速度并避免大量混洗。试试看