使用来自节点和参考数据框的 pyspark 动态创建链接

create links dynamically with pyspark from nodes and reference dataframe

我想根据这些数据框制作一个新的数据框:

DF1:

| Name  | ID  | Group    |
| ----- | --- | -------- |
| A     | 0   | mgr      |
| B     | 1   | mgr      |
| C     | 2   | mgr      |
| D     | 3   | hr       |
| E     | 4   | hr       |
| F     | 5   | hr       |
| G     | 6   | adm      |
| H     | 7   | adm      |
| I     | 8   | adm      |

DF2:

| Mgrs  | HR    | Admin    | Value |
| ------| ----- | -------- | ----- |
| A     | D     | G        | .0010 |
| B     | E     | H        | .0002 |
| C     | F     | I        | .0035 |

我想通过 DF1 迭代 DF2 并生成一个新的数据帧:

DF3:

| From  | To  | Value|
| ----- | --- | -----|
| 0     | 3   | .0010|
| 1     | 4   | .0002|
| 2     | 5   | .0035|
| 3     | 6   | .0010|
| 4     | 7   | .0002| 
| 5     | 8   | .0035|

我想从 DF1 (A) 的顶部开始检索 ID (0),将其放在 DF3 中的“发件人”列中,然后“收件人”列键将是正确的相邻DF2 中的列(DF2 的某些列 headers 对应于 DF1 中的组列中的值)。我还想将值列保留到 DF3 中,对应于 DF2 的行。关于如何使用 PySpark 完成此操作的任何建议?我在想也许某种形式的加入也能奏效。

正如您所说,这里可以使用几个连接

从获取 ID 开始
temp = (df2
    .join(df1, on=df1['Name'] == df2['Mgrs'], how='left')
    .select(F.col('ID').alias('Mgrs'), 'HR', 'Admin', 'Value')
 
    .join(df1, on=df1['Name'] == df2['HR'], how='left')
    .select('Mgrs', F.col('ID').alias('HR'), 'Admin', 'Value')
 
    .join(df1, on=df1['Name'] == df2['Admin'], how='left')
    .select('Mgrs', 'HR', F.col('ID').alias('Admin'), 'Value')
)

+----+---+-----+------+
|Mgrs| HR|Admin| Value|
+----+---+-----+------+
|   0|  3|    6| 0.001|
|   1|  4|    7|2.0E-4|
|   2|  5|    8|0.0035|
+----+---+-----+------+
然后处理您的映射逻辑
one = temp.select(F.col('Mgrs').alias('from'), F.col('HR').alias('to'), 'Value')
two = temp.select(F.col('HR').alias('from'), F.col('Admin').alias('to'), 'Value')
one.union(two).show()

+----+---+------+
|from| to| Value|
+----+---+------+
|   0|  3| 0.001|
|   1|  4|2.0E-4|
|   2|  5|0.0035|
|   3|  6| 0.001|
|   4|  7|2.0E-4|
|   5|  8|0.0035|
+----+---+------+