加入数组内的元素
Join on element inside array
我有两个数据帧,我必须使用一个数据帧的值来过滤使用该值的第二个数据帧。
例如,下面是数据集
import pyspark
from pyspark.sql import Row
cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
Row(city='blr',cust_id=101),
Row(city='chen',cust_id=102),
Row(city='mum',cust_id=103)])
item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
Row(item='chicken',geography=['a','hyd','c']),
Row(item='rice',geography=['a','b','c','blr']),
Row(item='soup',geography=['a','kol','simla']),
Row(item='pav',geography=['a','del']),
Row(item='kachori',geography=['a','guj']),
Row(item='fries',geography=['a','chen']),
Row(item='noodles',geography=['a','mum'])])
客户数据集输出:
+----+-------+
|city|cust_id|
+----+-------+
| hyd| 100|
| blr| 101|
|chen| 102|
| mum| 103|
+----+-------+
项目数据集输出:
+-------+------------------+
| item| geography|
+-------+------------------+
| fish|[london, a, b,hyd]|
|chicken| [a, hyd, c]|
| rice| [a, b, c, blr]|
| soup| [a, kol, simla]|
| pav| [a, del]|
|kachori| [a, guj]|
| fries| [a, chen]|
|noodles| [a, mum]|
+-------+------------------+
我需要使用客户数据框中的城市列值从项目数据集中获取项目。最终输出应该是:
+----+---------------+-------+
|city| items|cust_id|
+----+---------------+-------+
| hyd|[fish, chicken]| 100|
| blr| [rice]| 101|
|chen| [fries]| 102|
| mum| [noodles]| 103|
+----+---------------+-------+
在 join
之前我会 explode
the array column. Then, collect_list
聚合可以将所有项目移动到一个列表中。
from pyspark.sql import functions as F
df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
.agg(F.collect_list('item').alias('items'))
.select('city', 'items', 'cust_id')
)
df.show(truncate=False)
#+----+---------------+-------+
#|city|items |cust_id|
#+----+---------------+-------+
#|blr |[rice] |101 |
#|chen|[fries] |102 |
#|hyd |[fish, chicken]|100 |
#|mum |[noodles] |103 |
#+----+---------------+-------+
new = (
#join the two columns on city
item.withColumn('city',explode(col('geography')))
.join(cust,how='left',on='city')
#drop null rows and unwanted column
.dropna().drop('geography')
#groupby for the outcome
.groupby('city','cust_id').agg(collect_list('item').alias('items'))
)
new.show()
+----+---------------+-------+
|city| items|cust_id|
+----+---------------+-------+
| blr| [rice]| 101|
|chen| [fries]| 102|
| hyd|[fish, chicken]| 100|
| mum| [noodles]| 103|
+----+---------------+-------+
我有两个数据帧,我必须使用一个数据帧的值来过滤使用该值的第二个数据帧。
例如,下面是数据集
import pyspark
from pyspark.sql import Row
cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
Row(city='blr',cust_id=101),
Row(city='chen',cust_id=102),
Row(city='mum',cust_id=103)])
item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
Row(item='chicken',geography=['a','hyd','c']),
Row(item='rice',geography=['a','b','c','blr']),
Row(item='soup',geography=['a','kol','simla']),
Row(item='pav',geography=['a','del']),
Row(item='kachori',geography=['a','guj']),
Row(item='fries',geography=['a','chen']),
Row(item='noodles',geography=['a','mum'])])
客户数据集输出:
+----+-------+
|city|cust_id|
+----+-------+
| hyd| 100|
| blr| 101|
|chen| 102|
| mum| 103|
+----+-------+
项目数据集输出:
+-------+------------------+
| item| geography|
+-------+------------------+
| fish|[london, a, b,hyd]|
|chicken| [a, hyd, c]|
| rice| [a, b, c, blr]|
| soup| [a, kol, simla]|
| pav| [a, del]|
|kachori| [a, guj]|
| fries| [a, chen]|
|noodles| [a, mum]|
+-------+------------------+
我需要使用客户数据框中的城市列值从项目数据集中获取项目。最终输出应该是:
+----+---------------+-------+
|city| items|cust_id|
+----+---------------+-------+
| hyd|[fish, chicken]| 100|
| blr| [rice]| 101|
|chen| [fries]| 102|
| mum| [noodles]| 103|
+----+---------------+-------+
在 join
之前我会 explode
the array column. Then, collect_list
聚合可以将所有项目移动到一个列表中。
from pyspark.sql import functions as F
df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
.agg(F.collect_list('item').alias('items'))
.select('city', 'items', 'cust_id')
)
df.show(truncate=False)
#+----+---------------+-------+
#|city|items |cust_id|
#+----+---------------+-------+
#|blr |[rice] |101 |
#|chen|[fries] |102 |
#|hyd |[fish, chicken]|100 |
#|mum |[noodles] |103 |
#+----+---------------+-------+
new = (
#join the two columns on city
item.withColumn('city',explode(col('geography')))
.join(cust,how='left',on='city')
#drop null rows and unwanted column
.dropna().drop('geography')
#groupby for the outcome
.groupby('city','cust_id').agg(collect_list('item').alias('items'))
)
new.show()
+----+---------------+-------+
|city| items|cust_id|
+----+---------------+-------+
| blr| [rice]| 101|
|chen| [fries]| 102|
| hyd|[fish, chicken]| 100|
| mum| [noodles]| 103|
+----+---------------+-------+