Spark collect_list 将 data_type 从数组更改为字符串

Spark collect_list change data_type from array to string

我正在进行以下聚合

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))

我在这里做一些聚合并使用 collect_list 收集结果。之前我们使用的是 spark 1,它为我提供了以下数据类型。

 |-- final_data1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_data2: array (nullable = true)
 |    |-- element: string (containsNull = true)

现在我们必须迁移到 spark 2,但我们低于架构。

|-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

在得到 first() 下面的记录是不同的

spark 1.6

[2020-09-26, Ayush, 103.67] => datatype string

spark 2 

WrappedArray(2020-09-26, Ayush, 103.67)

如何保持相同的数据类型?

编辑 - 尝试使用 Concat

我得到像 Spark 1.6 这样的精确模式的一种方法是像这样使用 concat

val df_date_agg = df
    .groupBy($"msisdn",$"event_date",$"network")
    .agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
    .groupBy($"msisdn")
    .agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))

它会影响我的代码性能吗??有更好的方法吗?

填充 final1 和 final2 列将解决此问题。

val data = Seq((1,"A", "B"), (1, "C", "D"), (2,"E", "F"), (2,"G", "H"), (2,"I", "J"))

val df = spark.createDataFrame(
  data
).toDF("col1", "col2", "col3")

val old_df = df.groupBy(col("col1")).agg(
    collect_list(
        array(
            col("col2"), 
            col("col3")
            )
    ).as("final")
    )
val new_df = old_df.select(col("col1"), flatten(col("final")).as("final_new"))
println("Input Dataframe")

df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()

println("New schema format")
new_df.show(false)
new_df.printSchema()

输出:

Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |A   |B   |
|1   |C   |D   |
|2   |E   |F   |
|2   |G   |H   |
|2   |I   |J   |
+----+----+----+

Old schema format
+----+------------------------+
|col1|final                   |
+----+------------------------+
|1   |[[A, B], [C, D]]        |
|2   |[[E, F], [G, H], [I, J]]|
+----+------------------------+

root
 |-- col1: integer (nullable = false)
 |-- final: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

New schema format
+----+------------------+
|col1|final_new         |
+----+------------------+
|1   |[A, B, C, D]      |
|2   |[E, F, G, H, I, J]|
+----+------------------+

root
 |-- col1: integer (nullable = false)
 |-- final_new: array (nullable = true)
 |    |-- element: string (containsNull = true)

你的具体情况

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
         .select(flatten(col("final_data1").as("final_data1"), flatten(col("final_data2).as("final_data2))

既然你想要一个数组的字符串表示,那么将数组转换成这样的字符串怎么样?

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
         collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))

这可能只是您旧版本的 spark 正在做的事情。

顺便说一句,您提出的解决方案可能也能正常工作,但没有必要用 lit 包裹您的列引用 (lit($"event_date"))。 $"event_date" 够了。