使用 table 转换修复层次结构数据(Hive、scala、spark)
Fixing hierarchy data with table transformation (Hive, scala, spark)
我的任务是处理层次结构数据,但源数据在层次结构中包含错误,即:某些父子链接已损坏。我有一个重新建立这种连接的算法,但我还不能自己实现它。
例子:
初始数据为
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
示意图如下:
如您所见,与 C1 和 D3 的连接在这里丢失。
为了恢复连接,我需要为此应用以下算法 table:
如果对于某些 NAME,ID 不在 PARENTID 列中(例如 ID = 18、10),则创建一个包含 'parent' 的行,其中 LEVEL = (current LEVEL - 1) and PARENTID = (当前 ID),并取 ID 和 NAME,使得当前 ID < 来自以上 LEVEL 的节点的 ID。
结果必须是这样的:
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| B1 | 2 | 18 | 2 |#
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| C2 | 3 | 10 | 3 |#
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
其中带有 # 的行 - 新行 created.And 新架构如下所示:
关于如何在 spark/scala 中执行此算法有任何想法吗?谢谢!
您可以从当前数据帧构建一个 createdRows
数据帧,将其与当前数据帧合并以获得最终数据帧。
您可以分几步构建此 createdRows
数据框:
- 第一步是获取不在 PARENTID 列中的 ID(和 LEVEL)。您可以使用自左反连接来做到这一点。
- 然后,您将
ID
列重命名为 PARENTID
并更新 LEVEL
列,将其减少 1
。
- 然后,通过将新行的
ID
和 NAME
列与 LEVEL
列 上的输入数据框连接起来
- 最后,你应用你的条件
ID
< PARENTID
您最终得到以下代码,dataframe
是包含您的初始数据的数据框:
import org.apache.spark.sql.functions.col
val createdRows = dataframe
// if for some NAME the ID is not in the PARENTID column (like ID = 18, 10)
.select("LEVEL", "ID")
.filter(col("LEVEL") > 1) // Remove root node from created rows
.join(dataframe.select("PARENTID"), col("PARENTID") === col("ID"), "left_anti")
// then create a row with a 'parent' with LEVEL = (current LEVEL - 1) and PARENTID = (current ID)
.withColumnRenamed("ID", "PARENTID")
.withColumn("LEVEL", col("LEVEL") - 1)
// and take ID and NAME
.join(dataframe.select("NAME", "ID", "LEVEL"), Seq("LEVEL"))
// such that the current ID < ID of the node from the LEVEL above.
.filter(col("ID") < col("PARENTID"))
val result = dataframe
.unionByName(createdRows)
.orderBy("NAME", "PARENTID") // Optional, if you want an ordered result
在 result
数据框中你得到:
+----+---+--------+-----+
|NAME|ID |PARENTID|LEVEL|
+----+---+--------+-----+
|A1 |1 |2 |1 |
|B1 |2 |3 |2 |
|B1 |2 |18 |2 |
|C1 |18 |4 |3 |
|C2 |3 |5 |3 |
|C2 |3 |10 |3 |
|D1 |4 |null |4 |
|D2 |5 |null |4 |
|D3 |10 |11 |4 |
|E1 |11 |null |5 |
+----+---+--------+-----+
我的任务是处理层次结构数据,但源数据在层次结构中包含错误,即:某些父子链接已损坏。我有一个重新建立这种连接的算法,但我还不能自己实现它。 例子: 初始数据为
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
示意图如下:
如您所见,与 C1 和 D3 的连接在这里丢失。 为了恢复连接,我需要为此应用以下算法 table:
如果对于某些 NAME,ID 不在 PARENTID 列中(例如 ID = 18、10),则创建一个包含 'parent' 的行,其中 LEVEL = (current LEVEL - 1) and PARENTID = (当前 ID),并取 ID 和 NAME,使得当前 ID < 来自以上 LEVEL 的节点的 ID。
结果必须是这样的:
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| B1 | 2 | 18 | 2 |#
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| C2 | 3 | 10 | 3 |#
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
其中带有 # 的行 - 新行 created.And 新架构如下所示:
关于如何在 spark/scala 中执行此算法有任何想法吗?谢谢!
您可以从当前数据帧构建一个 createdRows
数据帧,将其与当前数据帧合并以获得最终数据帧。
您可以分几步构建此 createdRows
数据框:
- 第一步是获取不在 PARENTID 列中的 ID(和 LEVEL)。您可以使用自左反连接来做到这一点。
- 然后,您将
ID
列重命名为PARENTID
并更新LEVEL
列,将其减少1
。 - 然后,通过将新行的
ID
和NAME
列与LEVEL
列 上的输入数据框连接起来
- 最后,你应用你的条件
ID
<PARENTID
您最终得到以下代码,dataframe
是包含您的初始数据的数据框:
import org.apache.spark.sql.functions.col
val createdRows = dataframe
// if for some NAME the ID is not in the PARENTID column (like ID = 18, 10)
.select("LEVEL", "ID")
.filter(col("LEVEL") > 1) // Remove root node from created rows
.join(dataframe.select("PARENTID"), col("PARENTID") === col("ID"), "left_anti")
// then create a row with a 'parent' with LEVEL = (current LEVEL - 1) and PARENTID = (current ID)
.withColumnRenamed("ID", "PARENTID")
.withColumn("LEVEL", col("LEVEL") - 1)
// and take ID and NAME
.join(dataframe.select("NAME", "ID", "LEVEL"), Seq("LEVEL"))
// such that the current ID < ID of the node from the LEVEL above.
.filter(col("ID") < col("PARENTID"))
val result = dataframe
.unionByName(createdRows)
.orderBy("NAME", "PARENTID") // Optional, if you want an ordered result
在 result
数据框中你得到:
+----+---+--------+-----+
|NAME|ID |PARENTID|LEVEL|
+----+---+--------+-----+
|A1 |1 |2 |1 |
|B1 |2 |3 |2 |
|B1 |2 |18 |2 |
|C1 |18 |4 |3 |
|C2 |3 |5 |3 |
|C2 |3 |10 |3 |
|D1 |4 |null |4 |
|D2 |5 |null |4 |
|D3 |10 |11 |4 |
|E1 |11 |null |5 |
+----+---+--------+-----+