加入时如何迭代 Pyspark 中的数组列
How to iterate over an array column in Pyspark while joining
在 pyspark 中,我有 dataframe_a 和:
+-----------+----------------------+
| str1 | array_of_str |
+-----------+----------------------+
| John | [mango, apple] |
| Tom | [mango, orange] |
| Matteo | [apple, banana] |
和dataframe_b和
+-----------+----------------------+
| key | value |
+-----------+----------------------+
| mango | 1 |
| apple | 2 |
| orange | 3 |
并且我想创建一个数组 joined_result
类型的新列,将 array_of_str
(dataframe_a) 中的每个元素映射到它的dataframe_b中的值,如:
+-----------+----------------------+----------------------------------+
| str1 | array_of_str | joined_result |
+-----------+----------------------+----------------------------------+
| John | [mango, apple] | [1, 2] |
| Tom | [mango, orange] | [1, 3] |
| Matteo | [apple, banana] | [2] |
我不知道该怎么做,我知道我可以将 udf 与 lambda 函数一起使用,但我无法让它工作:( 帮助!
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
# START EXTRACT OF CODE
ret = (df
.select(['str1', 'array_of_str'])
.withColumn('joined_result', F.udf(
map(lambda x: ??????, ArrayType(StringType))
)
)
return ret
# END EXTRACT OF CODE
提前致谢
我对你问题的回答:
lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}
def mapper(keys):
return [lookup_dict[key][0] for key in keys]
dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))
如你所愿:-)
在 pyspark 中,我有 dataframe_a 和:
+-----------+----------------------+
| str1 | array_of_str |
+-----------+----------------------+
| John | [mango, apple] |
| Tom | [mango, orange] |
| Matteo | [apple, banana] |
和dataframe_b和
+-----------+----------------------+
| key | value |
+-----------+----------------------+
| mango | 1 |
| apple | 2 |
| orange | 3 |
并且我想创建一个数组 joined_result
类型的新列,将 array_of_str
(dataframe_a) 中的每个元素映射到它的dataframe_b中的值,如:
+-----------+----------------------+----------------------------------+
| str1 | array_of_str | joined_result |
+-----------+----------------------+----------------------------------+
| John | [mango, apple] | [1, 2] |
| Tom | [mango, orange] | [1, 3] |
| Matteo | [apple, banana] | [2] |
我不知道该怎么做,我知道我可以将 udf 与 lambda 函数一起使用,但我无法让它工作:( 帮助!
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
# START EXTRACT OF CODE
ret = (df
.select(['str1', 'array_of_str'])
.withColumn('joined_result', F.udf(
map(lambda x: ??????, ArrayType(StringType))
)
)
return ret
# END EXTRACT OF CODE
提前致谢
我对你问题的回答:
lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}
def mapper(keys):
return [lookup_dict[key][0] for key in keys]
dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))
如你所愿:-)