Spark 数据集:数据转换
Spark Dataset : data transformation
我有一个格式为 -
的 Spark 数据集
+--------------+--------+-----+
|name |type |cost |
+--------------+--------+-----+
|AAAAAAAAAAAAAA|XXXXX |0.24|
|AAAAAAAAAAAAAA|YYYYY |1.14|
|BBBBBBBBBBBBBB|XXXXX |0.78|
|BBBBBBBBBBBBBB|YYYYY |2.67|
|BBBBBBBBBBBBBB|ZZZZZ |0.15|
|CCCCCCCCCCCCCC|XXXXX |1.86|
|CCCCCCCCCCCCCC|YYYYY |1.50|
|CCCCCCCCCCCCCC|ZZZZZ |1.00|
+--------------+--------+----+
我想将其转换为类型的对象 -
public class CostPerName {
private String name;
private Map<String, Double> costTypeMap;
}
我要的是,
+--------------+-----------------------------------------------+
|name | typeCost. |
+--------------+-----------------------------------------------+
|AAAAAAAAAAAAAA|(XXXXX, 0.24), (YYYYY, 1.14) |
|BBBBBBBBBBBBBB|(XXXXX, 0.78), (YYYYY, 2.67), (ZZZZZ, 0.15) |
|CCCCCCCCCCCCCC|(XXXXX, 1.86), (YYYYY, 1.50), (ZZZZZ, 1.00) |
+--------------+-----------------------------------------------+
即,对于每个 name
,我想要一张 (type, cost)
的地图。
实现这种转变的有效方法是什么?我可以使用一些数据帧转换吗?我尝试了 groupBy,但只有在我执行汇总查询(如 sum、avg 等)时才有效。
您可以将 type 和 cost 这两个列组合成一个新的 struct column, then group by name and use collect_list 作为聚合函数:
df.withColumn("type_cost", struct("type", "cost"))
.groupBy("name").agg(collect_list("type_cost"))
这将导致像这样的数据帧:
+--------------+---------------------------------------------+
|name |collect_list(type_cost) |
+--------------+---------------------------------------------+
|AAAAAAAAAAAAAA|[[XXXXX, 0.24], [YYYYY, 1.14]] |
|CCCCCCCCCCCCCC|[[XXXXX, 1.86], [YYYYY, 1.50], [ZZZZZ, 1.00]]|
|BBBBBBBBBBBBBB|[[XXXXX, 0.78], [YYYYY, 2.67], [ZZZZZ, 0.15]]|
+--------------+---------------------------------------------+
如果您的 Spark 版本允许,您可以使用 map_from_arrays()
:
scala> val df2 = df.groupBy("name").agg(map_from_arrays(collect_list($"type"), collect_list($"cost")).as("typeCost"))
df2: org.apache.spark.sql.DataFrame = [name: string, typeCost: map<string,decimal(3,2)>]
scala> df2.printSchema()
root
|-- name: string (nullable = false)
|-- typeCost: map (nullable = true)
| |-- key: string
| |-- value: decimal(3,2) (valueContainsNull = true)
scala> df2.show(false)
+--------------+---------------------------------------------+
|name |typeCost |
+--------------+---------------------------------------------+
|AAAAAAAAAAAAAA|[XXXXX -> 0.24, YYYYY -> 1.14] |
|CCCCCCCCCCCCCC|[XXXXX -> 1.86, YYYYY -> 1.50, ZZZZZ -> 1.00]|
|BBBBBBBBBBBBBB|[XXXXX -> 0.78, YYYYY -> 2.67, ZZZZZ -> 0.15]|
+--------------+---------------------------------------------+
scala>
我有一个格式为 -
的 Spark 数据集+--------------+--------+-----+
|name |type |cost |
+--------------+--------+-----+
|AAAAAAAAAAAAAA|XXXXX |0.24|
|AAAAAAAAAAAAAA|YYYYY |1.14|
|BBBBBBBBBBBBBB|XXXXX |0.78|
|BBBBBBBBBBBBBB|YYYYY |2.67|
|BBBBBBBBBBBBBB|ZZZZZ |0.15|
|CCCCCCCCCCCCCC|XXXXX |1.86|
|CCCCCCCCCCCCCC|YYYYY |1.50|
|CCCCCCCCCCCCCC|ZZZZZ |1.00|
+--------------+--------+----+
我想将其转换为类型的对象 -
public class CostPerName {
private String name;
private Map<String, Double> costTypeMap;
}
我要的是,
+--------------+-----------------------------------------------+
|name | typeCost. |
+--------------+-----------------------------------------------+
|AAAAAAAAAAAAAA|(XXXXX, 0.24), (YYYYY, 1.14) |
|BBBBBBBBBBBBBB|(XXXXX, 0.78), (YYYYY, 2.67), (ZZZZZ, 0.15) |
|CCCCCCCCCCCCCC|(XXXXX, 1.86), (YYYYY, 1.50), (ZZZZZ, 1.00) |
+--------------+-----------------------------------------------+
即,对于每个 name
,我想要一张 (type, cost)
的地图。
实现这种转变的有效方法是什么?我可以使用一些数据帧转换吗?我尝试了 groupBy,但只有在我执行汇总查询(如 sum、avg 等)时才有效。
您可以将 type 和 cost 这两个列组合成一个新的 struct column, then group by name and use collect_list 作为聚合函数:
df.withColumn("type_cost", struct("type", "cost"))
.groupBy("name").agg(collect_list("type_cost"))
这将导致像这样的数据帧:
+--------------+---------------------------------------------+
|name |collect_list(type_cost) |
+--------------+---------------------------------------------+
|AAAAAAAAAAAAAA|[[XXXXX, 0.24], [YYYYY, 1.14]] |
|CCCCCCCCCCCCCC|[[XXXXX, 1.86], [YYYYY, 1.50], [ZZZZZ, 1.00]]|
|BBBBBBBBBBBBBB|[[XXXXX, 0.78], [YYYYY, 2.67], [ZZZZZ, 0.15]]|
+--------------+---------------------------------------------+
如果您的 Spark 版本允许,您可以使用 map_from_arrays()
:
scala> val df2 = df.groupBy("name").agg(map_from_arrays(collect_list($"type"), collect_list($"cost")).as("typeCost"))
df2: org.apache.spark.sql.DataFrame = [name: string, typeCost: map<string,decimal(3,2)>]
scala> df2.printSchema()
root
|-- name: string (nullable = false)
|-- typeCost: map (nullable = true)
| |-- key: string
| |-- value: decimal(3,2) (valueContainsNull = true)
scala> df2.show(false)
+--------------+---------------------------------------------+
|name |typeCost |
+--------------+---------------------------------------------+
|AAAAAAAAAAAAAA|[XXXXX -> 0.24, YYYYY -> 1.14] |
|CCCCCCCCCCCCCC|[XXXXX -> 1.86, YYYYY -> 1.50, ZZZZZ -> 1.00]|
|BBBBBBBBBBBBBB|[XXXXX -> 0.78, YYYYY -> 2.67, ZZZZZ -> 0.15]|
+--------------+---------------------------------------------+
scala>