在 PySpark 中匹配数组
Matching up arrays in PySpark
我正在尝试使用 PySpark 作为 AWS Glue 作业的一部分来操作两个数据帧。
df1:
item tag
1 AB
2 CD
3 EF
4 QQ
df2:
key1 key2 tags
A1 B1 [AB]
A1 B2 [AB, CD, EF]
A2 B1 [CD, EF]
A2 B3 [AB, EF, ZZ]
我想通过以下方式将 df2 中的数组与 df1 中的标签匹配:
item key1 key2 tag
1 A1 B1 AB
1 A1 B2 AB
2 A1 B2 CD
2 A2 B1 CD
3 A1 B2 EF
3 A2 B1 EF
3 A2 B3 EF
因此,df1中的标签用于根据df2中的标签条目扩展行。例如,项目 1 的标签“AB”出现在前两行的 df2 中的标签数组中。
还要注意 4 如何被忽略,因为标签 QQ 在 df2 的任何数组中都不存在。
我知道这将是一个内部联接,但我不确定如何将 df1.tag 与 df2.tags 匹配以引入 key1 和 key2。
如有任何帮助,我们将不胜感激。
您可以使用 array_contains
条件进行联接:
import pyspark.sql.functions as F
result = (df1.join(df2, F.array_contains(df2.tags, df1.tag))
.select('item', 'key1', 'key2', 'tag')
.orderBy('item', 'key1', 'key2')
)
result.show()
+----+----+----+---+
|item|key1|key2|tag|
+----+----+----+---+
| 1| A1| B1| AB|
| 1| A1| B2| AB|
| 1| A2| B3| AB|
| 2| A1| B2| CD|
| 2| A2| B1| CD|
| 3| A1| B2| EF|
| 3| A2| B1| EF|
| 3| A2| B3| EF|
+----+----+----+---+
import pyspark.sql.functions as F
df = df1.join(
df2.select('key1', 'key2', F.explode('tags').alias('tag')),
'tag',
'inner'
)
df.show()
# +---+----+----+----+
# |tag|item|key1|key2|
# +---+----+----+----+
# | EF| 3| A1| B2|
# | EF| 3| A2| B1|
# | EF| 3| A2| B3|
# | AB| 1| A1| B1|
# | AB| 1| A1| B2|
# | AB| 1| A2| B3|
# | CD| 2| A1| B2|
# | CD| 2| A2| B1|
# +---+----+----+----+
我正在尝试使用 PySpark 作为 AWS Glue 作业的一部分来操作两个数据帧。
df1:
item tag
1 AB
2 CD
3 EF
4 QQ
df2:
key1 key2 tags
A1 B1 [AB]
A1 B2 [AB, CD, EF]
A2 B1 [CD, EF]
A2 B3 [AB, EF, ZZ]
我想通过以下方式将 df2 中的数组与 df1 中的标签匹配:
item key1 key2 tag
1 A1 B1 AB
1 A1 B2 AB
2 A1 B2 CD
2 A2 B1 CD
3 A1 B2 EF
3 A2 B1 EF
3 A2 B3 EF
因此,df1中的标签用于根据df2中的标签条目扩展行。例如,项目 1 的标签“AB”出现在前两行的 df2 中的标签数组中。
还要注意 4 如何被忽略,因为标签 QQ 在 df2 的任何数组中都不存在。
我知道这将是一个内部联接,但我不确定如何将 df1.tag 与 df2.tags 匹配以引入 key1 和 key2。 如有任何帮助,我们将不胜感激。
您可以使用 array_contains
条件进行联接:
import pyspark.sql.functions as F
result = (df1.join(df2, F.array_contains(df2.tags, df1.tag))
.select('item', 'key1', 'key2', 'tag')
.orderBy('item', 'key1', 'key2')
)
result.show()
+----+----+----+---+
|item|key1|key2|tag|
+----+----+----+---+
| 1| A1| B1| AB|
| 1| A1| B2| AB|
| 1| A2| B3| AB|
| 2| A1| B2| CD|
| 2| A2| B1| CD|
| 3| A1| B2| EF|
| 3| A2| B1| EF|
| 3| A2| B3| EF|
+----+----+----+---+
import pyspark.sql.functions as F
df = df1.join(
df2.select('key1', 'key2', F.explode('tags').alias('tag')),
'tag',
'inner'
)
df.show()
# +---+----+----+----+
# |tag|item|key1|key2|
# +---+----+----+----+
# | EF| 3| A1| B2|
# | EF| 3| A2| B1|
# | EF| 3| A2| B3|
# | AB| 1| A1| B1|
# | AB| 1| A1| B2|
# | AB| 1| A2| B3|
# | CD| 2| A1| B2|
# | CD| 2| A2| B1|
# +---+----+----+----+