加入时如何迭代 Pyspark 中的数组列

How to iterate over an array column in Pyspark while joining

在 pyspark 中,我有 dataframe_a 和:

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple]       |
| Tom       | [mango, orange]      |
| Matteo    | [apple, banana]      | 

dataframe_b

+-----------+----------------------+
| key       | value                |
+-----------+----------------------+
| mango     | 1                    |
| apple     | 2                    |
| orange    | 3                    | 

并且我想创建一个数组 joined_result 类型的新列,将 array_of_str (dataframe_a) 中的每个元素映射到它的dataframe_b中的值,如:

+-----------+----------------------+----------------------------------+
| str1      | array_of_str         | joined_result                    |
+-----------+----------------------+----------------------------------+
| John      | [mango, apple]       | [1, 2]                           |
| Tom       | [mango, orange]      | [1, 3]                           |
| Matteo    | [apple, banana]      | [2]                              |

我不知道该怎么做,我知道我可以将 udf 与 lambda 函数一起使用,但我无法让它工作:( 帮助!

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType

# START EXTRACT OF CODE
ret = (df
  .select(['str1', 'array_of_str'])
  .withColumn('joined_result', F.udf(
     map(lambda x: ??????, ArrayType(StringType))
  )
)

return ret
# END EXTRACT OF CODE

提前致谢

我对你问题的回答:

lookup_list = map(lambda row: row.asDict(), dataframe_b.collect())
lookup_dict = {lookup['key']:lookup['value'] for lookup in lookup_list}

def mapper(keys):
  return [lookup_dict[key][0] for key in keys]

dataframe_a = dataframe_a.withColumn('joined_result', F.udf(mapper)("arr_of_str"))

如你所愿:-)