加入数组内的元素

Join on element inside array

我有两个数据帧,我必须使用一个数据帧的值来过滤使用该值的第二个数据帧。

例如,下面是数据集

import pyspark
from pyspark.sql import Row

cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
                              Row(city='blr',cust_id=101),
                              Row(city='chen',cust_id=102),
                              Row(city='mum',cust_id=103)])

item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
                              Row(item='chicken',geography=['a','hyd','c']),
                              Row(item='rice',geography=['a','b','c','blr']),
                              Row(item='soup',geography=['a','kol','simla']),
                              Row(item='pav',geography=['a','del']),
                              Row(item='kachori',geography=['a','guj']),
                              Row(item='fries',geography=['a','chen']),
                              Row(item='noodles',geography=['a','mum'])])

客户数据集输出:

+----+-------+
|city|cust_id|
+----+-------+
| hyd|    100|
| blr|    101|
|chen|    102|
| mum|    103|
+----+-------+

项目数据集输出:

+-------+------------------+
|   item|         geography|
+-------+------------------+
|   fish|[london, a, b,hyd]|
|chicken|       [a, hyd, c]|
|   rice|    [a, b, c, blr]|
|   soup|   [a, kol, simla]|
|    pav|          [a, del]|
|kachori|          [a, guj]|
|  fries|         [a, chen]|
|noodles|          [a, mum]|
+-------+------------------+

我需要使用客户数据框中的城市列值从项目数据集中获取项目。最终输出应该是:

+----+---------------+-------+
|city|          items|cust_id|
+----+---------------+-------+
| hyd|[fish, chicken]|    100|
| blr|         [rice]|    101|
|chen|        [fries]|    102|
| mum|      [noodles]|    103|
+----+---------------+-------+

join 之前我会 explode the array column. Then, collect_list 聚合可以将所有项目移动到一个列表中。

from pyspark.sql import functions as F

df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
        .agg(F.collect_list('item').alias('items'))
        .select('city', 'items', 'cust_id')
)
df.show(truncate=False)
#+----+---------------+-------+
#|city|items          |cust_id|
#+----+---------------+-------+
#|blr |[rice]         |101    |
#|chen|[fries]        |102    |
#|hyd |[fish, chicken]|100    |
#|mum |[noodles]      |103    |
#+----+---------------+-------+
new = (
  #join the two columns on city
  item.withColumn('city',explode(col('geography')))
  .join(cust,how='left',on='city')
  #drop null rows and unwanted column
  .dropna().drop('geography')
  #groupby for the outcome
.groupby('city','cust_id').agg(collect_list('item').alias('items'))
)

new.show()

+----+---------------+-------+
|city|      items|cust_id|
+----+---------------+-------+
| blr|         [rice]|    101|
|chen|        [fries]|    102|
| hyd|[fish, chicken]|    100|
| mum|      [noodles]|    103|
+----+---------------+-------+