使用来自节点和参考数据框的 pyspark 动态创建链接
create links dynamically with pyspark from nodes and reference dataframe
我想根据这些数据框制作一个新的数据框:
DF1:
| Name | ID | Group |
| ----- | --- | -------- |
| A | 0 | mgr |
| B | 1 | mgr |
| C | 2 | mgr |
| D | 3 | hr |
| E | 4 | hr |
| F | 5 | hr |
| G | 6 | adm |
| H | 7 | adm |
| I | 8 | adm |
DF2:
| Mgrs | HR | Admin | Value |
| ------| ----- | -------- | ----- |
| A | D | G | .0010 |
| B | E | H | .0002 |
| C | F | I | .0035 |
我想通过 DF1 迭代 DF2 并生成一个新的数据帧:
DF3:
| From | To | Value|
| ----- | --- | -----|
| 0 | 3 | .0010|
| 1 | 4 | .0002|
| 2 | 5 | .0035|
| 3 | 6 | .0010|
| 4 | 7 | .0002|
| 5 | 8 | .0035|
我想从 DF1 (A) 的顶部开始检索 ID (0),将其放在 DF3 中的“发件人”列中,然后“收件人”列键将是正确的相邻DF2 中的列(DF2 的某些列 headers 对应于 DF1 中的组列中的值)。我还想将值列保留到 DF3 中,对应于 DF2 的行。关于如何使用 PySpark 完成此操作的任何建议?我在想也许某种形式的加入也能奏效。
正如您所说,这里可以使用几个连接
从获取 ID 开始
temp = (df2
.join(df1, on=df1['Name'] == df2['Mgrs'], how='left')
.select(F.col('ID').alias('Mgrs'), 'HR', 'Admin', 'Value')
.join(df1, on=df1['Name'] == df2['HR'], how='left')
.select('Mgrs', F.col('ID').alias('HR'), 'Admin', 'Value')
.join(df1, on=df1['Name'] == df2['Admin'], how='left')
.select('Mgrs', 'HR', F.col('ID').alias('Admin'), 'Value')
)
+----+---+-----+------+
|Mgrs| HR|Admin| Value|
+----+---+-----+------+
| 0| 3| 6| 0.001|
| 1| 4| 7|2.0E-4|
| 2| 5| 8|0.0035|
+----+---+-----+------+
然后处理您的映射逻辑
one = temp.select(F.col('Mgrs').alias('from'), F.col('HR').alias('to'), 'Value')
two = temp.select(F.col('HR').alias('from'), F.col('Admin').alias('to'), 'Value')
one.union(two).show()
+----+---+------+
|from| to| Value|
+----+---+------+
| 0| 3| 0.001|
| 1| 4|2.0E-4|
| 2| 5|0.0035|
| 3| 6| 0.001|
| 4| 7|2.0E-4|
| 5| 8|0.0035|
+----+---+------+
我想根据这些数据框制作一个新的数据框:
DF1:
| Name | ID | Group |
| ----- | --- | -------- |
| A | 0 | mgr |
| B | 1 | mgr |
| C | 2 | mgr |
| D | 3 | hr |
| E | 4 | hr |
| F | 5 | hr |
| G | 6 | adm |
| H | 7 | adm |
| I | 8 | adm |
DF2:
| Mgrs | HR | Admin | Value |
| ------| ----- | -------- | ----- |
| A | D | G | .0010 |
| B | E | H | .0002 |
| C | F | I | .0035 |
我想通过 DF1 迭代 DF2 并生成一个新的数据帧:
DF3:
| From | To | Value|
| ----- | --- | -----|
| 0 | 3 | .0010|
| 1 | 4 | .0002|
| 2 | 5 | .0035|
| 3 | 6 | .0010|
| 4 | 7 | .0002|
| 5 | 8 | .0035|
我想从 DF1 (A) 的顶部开始检索 ID (0),将其放在 DF3 中的“发件人”列中,然后“收件人”列键将是正确的相邻DF2 中的列(DF2 的某些列 headers 对应于 DF1 中的组列中的值)。我还想将值列保留到 DF3 中,对应于 DF2 的行。关于如何使用 PySpark 完成此操作的任何建议?我在想也许某种形式的加入也能奏效。
正如您所说,这里可以使用几个连接
从获取 ID 开始
temp = (df2
.join(df1, on=df1['Name'] == df2['Mgrs'], how='left')
.select(F.col('ID').alias('Mgrs'), 'HR', 'Admin', 'Value')
.join(df1, on=df1['Name'] == df2['HR'], how='left')
.select('Mgrs', F.col('ID').alias('HR'), 'Admin', 'Value')
.join(df1, on=df1['Name'] == df2['Admin'], how='left')
.select('Mgrs', 'HR', F.col('ID').alias('Admin'), 'Value')
)
+----+---+-----+------+
|Mgrs| HR|Admin| Value|
+----+---+-----+------+
| 0| 3| 6| 0.001|
| 1| 4| 7|2.0E-4|
| 2| 5| 8|0.0035|
+----+---+-----+------+
然后处理您的映射逻辑
one = temp.select(F.col('Mgrs').alias('from'), F.col('HR').alias('to'), 'Value')
two = temp.select(F.col('HR').alias('from'), F.col('Admin').alias('to'), 'Value')
one.union(two).show()
+----+---+------+
|from| to| Value|
+----+---+------+
| 0| 3| 0.001|
| 1| 4|2.0E-4|
| 2| 5|0.0035|
| 3| 6| 0.001|
| 4| 7|2.0E-4|
| 5| 8|0.0035|
+----+---+------+