Spark collect_list 将 data_type 从数组更改为字符串
Spark collect_list change data_type from array to string
我正在进行以下聚合
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
我在这里做一些聚合并使用 collect_list
收集结果。之前我们使用的是 spark 1,它为我提供了以下数据类型。
|-- final_data1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- final_data2: array (nullable = true)
| |-- element: string (containsNull = true)
现在我们必须迁移到 spark 2,但我们低于架构。
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
在得到 first()
下面的记录是不同的
spark 1.6
[2020-09-26, Ayush, 103.67] => datatype string
spark 2
WrappedArray(2020-09-26, Ayush, 103.67)
如何保持相同的数据类型?
编辑 - 尝试使用 Concat
我得到像 Spark 1.6 这样的精确模式的一种方法是像这样使用 concat
val df_date_agg = df
.groupBy($"msisdn",$"event_date",$"network")
.agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
.groupBy($"msisdn")
.agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))
它会影响我的代码性能吗??有更好的方法吗?
填充 final1 和 final2 列将解决此问题。
val data = Seq((1,"A", "B"), (1, "C", "D"), (2,"E", "F"), (2,"G", "H"), (2,"I", "J"))
val df = spark.createDataFrame(
data
).toDF("col1", "col2", "col3")
val old_df = df.groupBy(col("col1")).agg(
collect_list(
array(
col("col2"),
col("col3")
)
).as("final")
)
val new_df = old_df.select(col("col1"), flatten(col("final")).as("final_new"))
println("Input Dataframe")
df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()
println("New schema format")
new_df.show(false)
new_df.printSchema()
输出:
Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1 |A |B |
|1 |C |D |
|2 |E |F |
|2 |G |H |
|2 |I |J |
+----+----+----+
Old schema format
+----+------------------------+
|col1|final |
+----+------------------------+
|1 |[[A, B], [C, D]] |
|2 |[[E, F], [G, H], [I, J]]|
+----+------------------------+
root
|-- col1: integer (nullable = false)
|-- final: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
New schema format
+----+------------------+
|col1|final_new |
+----+------------------+
|1 |[A, B, C, D] |
|2 |[E, F, G, H, I, J]|
+----+------------------+
root
|-- col1: integer (nullable = false)
|-- final_new: array (nullable = true)
| |-- element: string (containsNull = true)
你的具体情况
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
.select(flatten(col("final_data1").as("final_data1"), flatten(col("final_data2).as("final_data2))
既然你想要一个数组的字符串表示,那么将数组转换成这样的字符串怎么样?
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))
这可能只是您旧版本的 spark 正在做的事情。
顺便说一句,您提出的解决方案可能也能正常工作,但没有必要用 lit
包裹您的列引用 (lit($"event_date")
)。 $"event_date"
够了。
我正在进行以下聚合
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
我在这里做一些聚合并使用 collect_list
收集结果。之前我们使用的是 spark 1,它为我提供了以下数据类型。
|-- final_data1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- final_data2: array (nullable = true)
| |-- element: string (containsNull = true)
现在我们必须迁移到 spark 2,但我们低于架构。
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
在得到 first()
下面的记录是不同的
spark 1.6
[2020-09-26, Ayush, 103.67] => datatype string
spark 2
WrappedArray(2020-09-26, Ayush, 103.67)
如何保持相同的数据类型?
编辑 - 尝试使用 Concat
我得到像 Spark 1.6 这样的精确模式的一种方法是像这样使用 concat
val df_date_agg = df
.groupBy($"msisdn",$"event_date",$"network")
.agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
.groupBy($"msisdn")
.agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))
它会影响我的代码性能吗??有更好的方法吗?
填充 final1 和 final2 列将解决此问题。
val data = Seq((1,"A", "B"), (1, "C", "D"), (2,"E", "F"), (2,"G", "H"), (2,"I", "J"))
val df = spark.createDataFrame(
data
).toDF("col1", "col2", "col3")
val old_df = df.groupBy(col("col1")).agg(
collect_list(
array(
col("col2"),
col("col3")
)
).as("final")
)
val new_df = old_df.select(col("col1"), flatten(col("final")).as("final_new"))
println("Input Dataframe")
df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()
println("New schema format")
new_df.show(false)
new_df.printSchema()
输出:
Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1 |A |B |
|1 |C |D |
|2 |E |F |
|2 |G |H |
|2 |I |J |
+----+----+----+
Old schema format
+----+------------------------+
|col1|final |
+----+------------------------+
|1 |[[A, B], [C, D]] |
|2 |[[E, F], [G, H], [I, J]]|
+----+------------------------+
root
|-- col1: integer (nullable = false)
|-- final: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
New schema format
+----+------------------+
|col1|final_new |
+----+------------------+
|1 |[A, B, C, D] |
|2 |[E, F, G, H, I, J]|
+----+------------------+
root
|-- col1: integer (nullable = false)
|-- final_new: array (nullable = true)
| |-- element: string (containsNull = true)
你的具体情况
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
.select(flatten(col("final_data1").as("final_data1"), flatten(col("final_data2).as("final_data2))
既然你想要一个数组的字符串表示,那么将数组转换成这样的字符串怎么样?
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))
这可能只是您旧版本的 spark 正在做的事情。
顺便说一句,您提出的解决方案可能也能正常工作,但没有必要用 lit
包裹您的列引用 (lit($"event_date")
)。 $"event_date"
够了。