Scala spark - 处理层次结构数据表

Scala spark - Dealing with Hierarchy data tables

我的数据 table 具有树结构的层次结构数据模型。 例如: 这是一个示例数据行:

-------------------------------------------
Id | name    |parentId | path       | depth
-------------------------------------------
55 | Canada  | null    | null       | 0
77 | Ontario |  55     | /55        | 1
100| Toronto |  77     | /55/77     | 2
104| Brampton| 100     | /55/77/100 | 3

我希望将这些行转换为扁平化版本,示例输出为:

-----------------------------------
Id | name     | parentId | depth
------------------------------------
104| Brampton | Toronto  | 3
100| Toronto  | Ontario  | 2
77 | Ontario  | Canada   | 1
55 | Canada   | None     | 0
100| Toronto  | Ontario  | 2
77 | Ontario  | Canada   | 1
55 | Canada   | None     | 0
77 | Ontario  | Canada   | 1
55 | Canada   | None     | 0
55 | Canada   | None     | 0

我尝试使用笛卡尔坐标或喜欢 n2 搜索,但 none 有效。

下面是一种方法:

//Creating DF with your data
def getSeq(s:String): Seq[String] = { s.split('|').map(_.trim).toSeq }
var l = getSeq("77 | Ontario |  55     | /55        | 1") :: Nil
l :+= getSeq("55 | Canada  | null    | null       | 0")
l :+= getSeq("100| Toronto |  77     | /55/77     | 2")
l :+= getSeq("104| Brampton| 100     | /55/77/100 | 3")
val df = l.map(x => x match { case Seq(a,b,c,d,e) => (a,b,c,d,e) }).toDF("Id", "name", "parentId", "path", "depth")

//original DF with parentName using a self join
val dfWithPar = df.as("df1").join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").select($"df1.Id",$"df1.name",$"df1.parentId",$"df1.path",$"df1.depth",$"df2.name".as("parentName"))

// Split path as per requirement and get the exploded DF
val dfExploded = dfWithPar.withColumn("path", regexp_replace($"path", "^/", "")).withColumn("path", split($"path","/")).withColumn("path", explode($"path"))

//Join orig with exploded to get addendum of rows as per individual path placeholders
val dfJoined = dfWithPar.join(dfExploded, dfWithPar.col("Id") === dfExploded.col("path")).select(dfWithPar.col("Id"), dfWithPar.col("name"), dfWithPar.col("parentId"), dfWithPar.col("path"), dfWithPar.col("depth"), dfWithPar.col("parentName"))

//Get the final result by adding the addendum to orig
dfWithPar.union(dfJoined).select($"Id", $"name", $"parentName", $"depth").show

+---+--------+----------+-----+
| Id|    name|parentName|depth|
+---+--------+----------+-----+
| 77| Ontario|    Canada|    1|
| 55|  Canada|      null|    0|
|100| Toronto|   Ontario|    2|
|104|Brampton|   Toronto|    3|
| 77| Ontario|    Canada|    1|
| 77| Ontario|    Canada|    1|
| 55|  Canada|      null|    0|
| 55|  Canada|      null|    0|
| 55|  Canada|      null|    0|
|100| Toronto|   Ontario|    2|
+---+--------+----------+-----+

有条件的自连接选择适当的列应该适合你。

解决方案有点棘手,因为您需要在路径列中找到每个父名称,包括 papentId 列,这需要 concat_wssplitexplode 内置函数 。剩下的过程是 joinsselectsfills.

给定的数据框:

+---+--------+--------+----------+-----+
|Id |name    |parentId|path      |depth|
+---+--------+--------+----------+-----+
|55 |Canada  |null    |null      |0    |
|77 |Ontario |55      |/55       |1    |
|100|Toronto |77      |/55/77    |2    |
|104|Brampton|100     |/55/77/100|3    |
+---+--------+--------+----------+-----+

您可以最终加入生成临时数据帧作为

val df2 = df.as("table1")
  .join(df.as("table2"), col("table1.parentId") === col("table2.Id"), "left")
  .select(col("table1.Id").as("path"), col("table1.name").as("name"), col("table2.name").as("parentId"), col("table1.depth").as("depth"))
  .na.fill("None")
//    +----+--------+--------+-----+
//    |path|name    |parentId|depth|
//    +----+--------+--------+-----+
//    |55  |Canada  |None    |0    |
//    |77  |Ontario |Canada  |1    |
//    |100 |Toronto |Ontario |2    |
//    |104 |Brampton|Toronto |3    |
//    +----+--------+--------+-----+

并且可以通过

来实现所需的数据帧
df.withColumn("path", explode(split(concat_ws("", col("parentId"), col("path")), "/")))
    .as("table1")
    .join(df2.as("table2"), Seq("path"), "right")
    .select(col("table2.path").as("Id"), col("table2.name").as("name"), col("table2.parentId").as("parentId"), col("table2.depth").as("depth"))
    .na.fill("0")
  .show(false)
//    +---+--------+--------+-----+
//    |Id |name    |parentId|depth|
//    +---+--------+--------+-----+
//    |55 |Canada  |None    |0    |
//    |55 |Canada  |None    |0    |
//    |55 |Canada  |None    |0    |
//    |55 |Canada  |None    |0    |
//    |77 |Ontario |Canada  |1    |
//    |77 |Ontario |Canada  |1    |
//    |77 |Ontario |Canada  |1    |
//    |100|Toronto |Ontario |2    |
//    |100|Toronto |Ontario |2    |
//    |104|Brampton|Toronto |3    |
//    +---+--------+--------+-----+

说明

|104|Brampton|100 |/55/77/100|3 |
concat_ws("", col("parentId"), col("path")) 会生成 |104|Brampton|100 |100/55/77/100|3 | 正如你所看到的 100 被连接起来 在前面
split(concat_ws("", col("parentId"), col("path")), "/") 将生成 数组列 作为 |104|Brampton|100 |[100, 55, 77, 100]|3 |
并且 explode(split(concat_ws("", col("parentId"), col("path")), "/")) 作为一个整体将 将数组列分解为单独的行 as

|104|Brampton|100     |100     |3    |
|104|Brampton|100     |55      |3    |
|104|Brampton|100     |77      |3    |
|104|Brampton|100     |100     |3    |

joins 更容易理解哪些不需要解释 ;)

希望回答对你有帮助

这是另一个版本:

val sparkConf = new SparkConf().setAppName("pathtest").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._

var dfA = spark.createDataset(Seq(
  (55, "Canada", -1, "", 0),
  (77, "Ontario", 55, "/55", 1),
  (100, "Toronto", 77, "/55/77", 2),
  (104, "Brampton", 100, "/55/77/100", 3))
)
.toDF("Id", "name", "parentId", "path", "depth")


def getArray = udf((path: String) => {
  if (path.contains("/"))
    path.split("/")
  else
    Array[String](null)
})

val dfB = dfA
    .withColumn("path", getArray(col("path")))
    .withColumn("path", explode(col("path")))
    .toDF()

dfB.as("B").join(dfA.as("A"), $"B.parentId" === $"A.Id", "left")
  .select($"B.Id".as("Id"), $"B.name".as("name"), $"A.name".as("parent"), $"B.depth".as("depth"))
    .show()

我有 2 个数据帧 dfA 和 dfB,它们是从第一个数据帧生成的。 dfB 是通过分解路径数组用 udf 生成的。请注意,加拿大的技巧是 return 一个空数组,否则 explode 将不会生成一行。

dfB 看起来像这样:

+---+--------+--------+----+-----+
| Id|    name|parentId|path|depth|
+---+--------+--------+----+-----+
| 55|  Canada|      -1|null|    0|
| 77| Ontario|      55|    |    1|
| 77| Ontario|      55|  55|    1|
|100| Toronto|      77|    |    2|
|100| Toronto|      77|  55|    2|
|100| Toronto|      77|  77|    2|
|104|Brampton|     100|    |    3|
|104|Brampton|     100|  55|    3|
|104|Brampton|     100|  77|    3|
|104|Brampton|     100| 100|    3|
+---+--------+--------+----+-----+ 

最终结果如下:

+---+--------+-------+-----+
| Id|    name| parent|depth|
+---+--------+-------+-----+
| 55|  Canada|   null|    0|
| 77| Ontario| Canada|    1|
| 77| Ontario| Canada|    1|
|100| Toronto|Ontario|    2|
|100| Toronto|Ontario|    2|
|100| Toronto|Ontario|    2|
|104|Brampton|Toronto|    3|
|104|Brampton|Toronto|    3|
|104|Brampton|Toronto|    3|
|104|Brampton|Toronto|    3|
+---+--------+-------+-----+