连接一列中的值并创建另一列
Concat values from a column and make another column
我正在使用 Spark SQL,并在 Hive Table 上进行一些 SQL 操作。
我的 table 是这样的:
```
ID COST CODE
1 100 AB1
5 200 BC3
1 400 FD3
6 600 HJ2
1 900 432
3 800 DS2
2 500 JT4
```
我想从中创建另一个 table,它会像这样在另一列中包含总成本和链中的前 5 个代码。
```
ID TOTAL_COST CODE CODE_CHAIN
1 1400 432 432, FD3, AB1
```
总成本很简单,但是如何连接 CODE 列中的值并形成另一列。
我试过 collect_set 功能,但是,值不能被限制,也不能正确排序,可能是由于分布式处理。
任何SQL逻辑都可能吗?
编辑:
我需要对数据进行排序,所以我得到了前 5 个值。
使用slice
、sort_array
和collect_list
import org.apache.spark.sql.functions._
df
.groupBy("id")
.agg(
sum("cost") as "total_cost",
slice(sort_array(collect_list(struct($"cost", $"code")), false), 1, 5)("code") as "codes")
在 Spark 2.3 中,您必须将 slice
替换为已排序数组的手动索引
val sorted = sort_array(collect_list(struct($"cost", $"code")), false)("code")
val codes = array((0 until 5).map(i => sorted.getItem(i)): _*) as "codes"
使用 window 函数和 with() table 过滤第一个 row_number。看看这个:
scala> val df = Seq((1,100,"AB1"),(5,200,"BC3"),(1,400,"FD3"),(6,600,"HJ2"),(1,900,"432"),(3,800,"DS2"),(2,500,"JT4")).toDF("ID","COST","CODE")
df: org.apache.spark.sql.DataFrame = [ID: int, COST: int ... 1 more field]
scala> df.show()
+---+----+----+
| ID|COST|CODE|
+---+----+----+
| 1| 100| AB1|
| 5| 200| BC3|
| 1| 400| FD3|
| 6| 600| HJ2|
| 1| 900| 432|
| 3| 800| DS2|
| 2| 500| JT4|
+---+----+----+
scala> df.createOrReplaceTempView("course")
scala> spark.sql(""" with tab1(select id,cost,code,collect_list(code) over(partition by id order by cost desc rows between current row and 5 following ) cc, row_number() over(partition by id order by cost desc) rc,sum(cost) over(partition by id order by cost desc rows between current row and 5 following) total from course) select id, total, cc from tab1 where rc=1 """).show(false)
+---+-----+---------------+
|id |total|cc |
+---+-----+---------------+
|1 |1400 |[432, FD3, AB1]|
|6 |600 |[HJ2] |
|3 |800 |[DS2] |
|5 |200 |[BC3] |
|2 |500 |[JT4] |
+---+-----+---------------+
scala>
我正在使用 Spark SQL,并在 Hive Table 上进行一些 SQL 操作。 我的 table 是这样的: ```
ID COST CODE
1 100 AB1
5 200 BC3
1 400 FD3
6 600 HJ2
1 900 432
3 800 DS2
2 500 JT4
```
我想从中创建另一个 table,它会像这样在另一列中包含总成本和链中的前 5 个代码。
```
ID TOTAL_COST CODE CODE_CHAIN
1 1400 432 432, FD3, AB1
```
总成本很简单,但是如何连接 CODE 列中的值并形成另一列。
我试过 collect_set 功能,但是,值不能被限制,也不能正确排序,可能是由于分布式处理。
任何SQL逻辑都可能吗?
编辑:
我需要对数据进行排序,所以我得到了前 5 个值。
使用slice
、sort_array
和collect_list
import org.apache.spark.sql.functions._
df
.groupBy("id")
.agg(
sum("cost") as "total_cost",
slice(sort_array(collect_list(struct($"cost", $"code")), false), 1, 5)("code") as "codes")
在 Spark 2.3 中,您必须将 slice
替换为已排序数组的手动索引
val sorted = sort_array(collect_list(struct($"cost", $"code")), false)("code")
val codes = array((0 until 5).map(i => sorted.getItem(i)): _*) as "codes"
使用 window 函数和 with() table 过滤第一个 row_number。看看这个:
scala> val df = Seq((1,100,"AB1"),(5,200,"BC3"),(1,400,"FD3"),(6,600,"HJ2"),(1,900,"432"),(3,800,"DS2"),(2,500,"JT4")).toDF("ID","COST","CODE")
df: org.apache.spark.sql.DataFrame = [ID: int, COST: int ... 1 more field]
scala> df.show()
+---+----+----+
| ID|COST|CODE|
+---+----+----+
| 1| 100| AB1|
| 5| 200| BC3|
| 1| 400| FD3|
| 6| 600| HJ2|
| 1| 900| 432|
| 3| 800| DS2|
| 2| 500| JT4|
+---+----+----+
scala> df.createOrReplaceTempView("course")
scala> spark.sql(""" with tab1(select id,cost,code,collect_list(code) over(partition by id order by cost desc rows between current row and 5 following ) cc, row_number() over(partition by id order by cost desc) rc,sum(cost) over(partition by id order by cost desc rows between current row and 5 following) total from course) select id, total, cc from tab1 where rc=1 """).show(false)
+---+-----+---------------+
|id |total|cc |
+---+-----+---------------+
|1 |1400 |[432, FD3, AB1]|
|6 |600 |[HJ2] |
|3 |800 |[DS2] |
|5 |200 |[BC3] |
|2 |500 |[JT4] |
+---+-----+---------------+
scala>