使用 table 转换修复层次结构数据(Hive、scala、spark)

Fixing hierarchy data with table transformation (Hive, scala, spark)

我的任务是处理层次结构数据,但源数据在层次结构中包含错误,即:某些父子链接已损坏。我有一个重新建立这种连接的算法,但我还不能自己实现它。 例子: 初始数据为

+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1   |  1 | 2        |     1 |
| B1   |  2 | 3        |     2 |
| C1   | 18 | 4        |     3 |
| C2   |  3 | 5        |     3 |
| D1   |  4 | NULL     |     4 |
| D2   |  5 | NULL     |     4 |
| D3   | 10 | 11       |     4 |
| E1   | 11 | NULL     |     5 |
+------+----+----------+-------+

示意图如下:

如您所见,与 C1 和 D3 的连接在这里丢失。 为了恢复连接,我需要为此应用以下算法 table:

如果对于某些 NAME,ID 不在 PARENTID 列中(例如 ID = 18、10),则创建一个包含 'parent' 的行,其中 LEVEL = (current LEVEL - 1) and PARENTID = (当前 ID),并取 ID 和 NAME,使得当前 ID < 来自以上 LEVEL 的节点的 ID。

结果必须是这样的:

+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1   |  1 | 2        |     1 |
| B1   |  2 | 3        |     2 |
| B1   |  2 | 18       |     2 |#
| C1   | 18 | 4        |     3 |
| C2   |  3 | 5        |     3 |
| C2   |  3 | 10       |     3 |#
| D1   |  4 | NULL     |     4 |
| D2   |  5 | NULL     |     4 |
| D3   | 10 | 11       |     4 |
| E1   | 11 | NULL     |     5 |
+------+----+----------+-------+

其中带有 # 的行 - 新行 created.And 新架构如下所示:

关于如何在 spark/scala 中执行此算法有任何想法吗?谢谢!

您可以从当前数据帧构建一个 createdRows 数据帧,将其与当前数据帧合并以获得最终数据帧。

您可以分几步构建此 createdRows 数据框:

  • 第一步是获取不在 PARENTID 列中的 ID(和 LEVEL)。您可以使用自左反连接来做到这一点。
  • 然后,您将 ID 列重命名为 PARENTID 并更新 LEVEL 列,将其减少 1
  • 然后,通过将新行的 IDNAME 列与 LEVEL
  • 上的输入数据框连接起来
  • 最后,你应用你的条件 ID < PARENTID

您最终得到以下代码,dataframe 是包含您的初始数据的数据框:

import org.apache.spark.sql.functions.col

val createdRows = dataframe
  // if for some NAME the ID is not in the PARENTID column (like ID = 18, 10)
  .select("LEVEL", "ID")
  .filter(col("LEVEL") > 1) // Remove root node from created rows
  .join(dataframe.select("PARENTID"), col("PARENTID") === col("ID"), "left_anti")
  // then create a row with a 'parent' with LEVEL = (current LEVEL - 1) and PARENTID = (current ID)
  .withColumnRenamed("ID", "PARENTID")
  .withColumn("LEVEL", col("LEVEL") - 1)
  // and take ID and NAME  
  .join(dataframe.select("NAME", "ID", "LEVEL"), Seq("LEVEL"))
  // such that the current ID < ID of the node from the LEVEL above.
  .filter(col("ID") < col("PARENTID"))

val result = dataframe
  .unionByName(createdRows)
  .orderBy("NAME", "PARENTID") // Optional, if you want an ordered result

result 数据框中你得到:

+----+---+--------+-----+
|NAME|ID |PARENTID|LEVEL|
+----+---+--------+-----+
|A1  |1  |2       |1    |
|B1  |2  |3       |2    |
|B1  |2  |18      |2    |
|C1  |18 |4       |3    |
|C2  |3  |5       |3    |
|C2  |3  |10      |3    |
|D1  |4  |null    |4    |
|D2  |5  |null    |4    |
|D3  |10 |11      |4    |
|E1  |11 |null    |5    |
+----+---+--------+-----+