使用数组值合并两个 spark 数据帧
Merge two spark dataframes using array values
我有两个如下所示的 Spark 数据帧:
> cities_df
+----------+---------------------------+
| city_id| cities|
+----------+---------------------------+
| 22 |[Milan, Turin, Rome] |
+----------+---------------------------+
| 15 |[Naples, Florence, Genoa] |
+----------+---------------------------+
| 43 |[Houston, San Jose, Boston]|
+----------+---------------------------+
| 56 |[New York, Dallas, Chicago]|
+----------+---------------------------+
> countries_df
+----------+----------------------------------+
|country_id| countries|
+----------+----------------------------------+
| 680 |{'country': [56, 43], 'add': []} |
+----------+----------------------------------+
| 11 |{'country': [22, 15], 'add': [32]}|
+----------+----------------------------------+
countries_df
中的国家/地区值是 cities_df
数据框中的城市 ID。
我需要合并这些数据框,用 cities_df
数据框中的值替换 country
的城市 ID。
预期输出:
country_id
countries
grouped_cities
680
{'country': [56, 43], 'add': []}
[New York, Dallas, Chicago, Houston, San Jose, Boston]
11
{'country': [22, 15], 'add': [32]}
[Milan, Turin, Rome, Naples, Florence, Genoa]
获取的grouped_cities
值不必是数组类型,可以是字符串
如何使用 PySpark 获得此结果?
输入:
from pyspark.sql import functions as F
cities_df = spark.createDataFrame(
[(22, ['Milan', 'Turin', 'Rome']),
(15, ['Naples', 'Florence', 'Genoa']),
(43, ['Houston', 'San Jose', 'Boston']),
(56, ['New York', 'Dallas', 'Chicago'])],
['city_id', 'cities']
)
countries_df = spark.createDataFrame(
[(680, {'country': [56, 43], 'add': []}),
(11, {'country': [22, 15], 'add': [32]})],
['country_id', 'countries']
)
脚本:
df_expl = countries_df.withColumn('city_id', F.explode('countries.country'))
df_joined = df_expl.join(cities_df, 'city_id', 'left')
df = df_joined.groupBy('country_id').agg(
F.first('countries').alias('countries'),
F.flatten(F.collect_list('cities')).alias('grouped_cities')
)
df.show(truncate=0)
# +----------+----------------------------------+------------------------------------------------------+
# |country_id|countries |grouped_cities |
# +----------+----------------------------------+------------------------------------------------------+
# |11 |{add -> [32], country -> [22, 15]}|[Naples, Florence, Genoa, Milan, Turin, Rome] |
# |680 |{add -> [], country -> [56, 43]} |[Houston, San Jose, Boston, New York, Dallas, Chicago]|
# +----------+----------------------------------+------------------------------------------------------+
另一种方法。使用 select 在 countries_df 上创建一个新列。使用 country_id 分组,并将国家/地区列转换为字符串。下面的代码。
new =cities_df.join(countries_df.select('*',explode('countries.country').alias('city_id')), how='left', on='city_id').groupby('country_id',col('countries').cast('string').alias('countries')).agg(flatten(collect_set('cities')).alias('cities')).show(truncate=False)
+----------+----------------------------------+------------------------------------------------------+
|country_id|countries |cities |
+----------+----------------------------------+------------------------------------------------------+
|11 |{add -> [32], country -> [22, 15]}|[Milan, Turin, Rome, Naples, Florence, Genoa] |
|680 |{add -> [], country -> [56, 43]} |[New York, Dallas, Chicago, Houston, San Jose, Boston]|
+----------+----------------------------------+------------------------------------------------------+
我有两个如下所示的 Spark 数据帧:
> cities_df
+----------+---------------------------+
| city_id| cities|
+----------+---------------------------+
| 22 |[Milan, Turin, Rome] |
+----------+---------------------------+
| 15 |[Naples, Florence, Genoa] |
+----------+---------------------------+
| 43 |[Houston, San Jose, Boston]|
+----------+---------------------------+
| 56 |[New York, Dallas, Chicago]|
+----------+---------------------------+
> countries_df
+----------+----------------------------------+
|country_id| countries|
+----------+----------------------------------+
| 680 |{'country': [56, 43], 'add': []} |
+----------+----------------------------------+
| 11 |{'country': [22, 15], 'add': [32]}|
+----------+----------------------------------+
countries_df
中的国家/地区值是 cities_df
数据框中的城市 ID。
我需要合并这些数据框,用 cities_df
数据框中的值替换 country
的城市 ID。
预期输出:
country_id | countries | grouped_cities |
---|---|---|
680 | {'country': [56, 43], 'add': []} | [New York, Dallas, Chicago, Houston, San Jose, Boston] |
11 | {'country': [22, 15], 'add': [32]} | [Milan, Turin, Rome, Naples, Florence, Genoa] |
获取的grouped_cities
值不必是数组类型,可以是字符串
如何使用 PySpark 获得此结果?
输入:
from pyspark.sql import functions as F
cities_df = spark.createDataFrame(
[(22, ['Milan', 'Turin', 'Rome']),
(15, ['Naples', 'Florence', 'Genoa']),
(43, ['Houston', 'San Jose', 'Boston']),
(56, ['New York', 'Dallas', 'Chicago'])],
['city_id', 'cities']
)
countries_df = spark.createDataFrame(
[(680, {'country': [56, 43], 'add': []}),
(11, {'country': [22, 15], 'add': [32]})],
['country_id', 'countries']
)
脚本:
df_expl = countries_df.withColumn('city_id', F.explode('countries.country'))
df_joined = df_expl.join(cities_df, 'city_id', 'left')
df = df_joined.groupBy('country_id').agg(
F.first('countries').alias('countries'),
F.flatten(F.collect_list('cities')).alias('grouped_cities')
)
df.show(truncate=0)
# +----------+----------------------------------+------------------------------------------------------+
# |country_id|countries |grouped_cities |
# +----------+----------------------------------+------------------------------------------------------+
# |11 |{add -> [32], country -> [22, 15]}|[Naples, Florence, Genoa, Milan, Turin, Rome] |
# |680 |{add -> [], country -> [56, 43]} |[Houston, San Jose, Boston, New York, Dallas, Chicago]|
# +----------+----------------------------------+------------------------------------------------------+
另一种方法。使用 select 在 countries_df 上创建一个新列。使用 country_id 分组,并将国家/地区列转换为字符串。下面的代码。
new =cities_df.join(countries_df.select('*',explode('countries.country').alias('city_id')), how='left', on='city_id').groupby('country_id',col('countries').cast('string').alias('countries')).agg(flatten(collect_set('cities')).alias('cities')).show(truncate=False)
+----------+----------------------------------+------------------------------------------------------+
|country_id|countries |cities |
+----------+----------------------------------+------------------------------------------------------+
|11 |{add -> [32], country -> [22, 15]}|[Milan, Turin, Rome, Naples, Florence, Genoa] |
|680 |{add -> [], country -> [56, 43]} |[New York, Dallas, Chicago, Houston, San Jose, Boston]|
+----------+----------------------------------+------------------------------------------------------+