Pyspark 组合不同长度的数据帧而不重复
Pyspark combine dataframes of different length without duplicating
我有这三个dfs:
id | name
------------------------
1 | {"value": "bob"}
1 | {"value": "Robert"}
2 | {"value": "Mary"}
id | dob
----------------------------
1 | {"value": "21-04-1988"}
2 | {"value": null}
id | country
--------------------
1 | {"value": "IT"}
1 | {"value": "DE"}
2 | {"value": "FR"}
2 | {"value": "ES"}
而且我想合并它们,但我不想重复信息。
id | name | dob |country
----------------------------------------------------------------------
1 | {"value": "bob"} | {"value": "21-04-1988"} | {"value": "IT"}
1 | {"value": "Robert"} | Null | {"value": "DE"}
2 | {"value": "Mary"} | {"value": Null} | {"value": "FR"}
2 | Null | Null | {"value": "ES"}
我试过使用多重外连接,但没有得到上面的结果 table。
name = spark.createDataFrame(
[
(1, {"value" : "bob"}), # create your data here, be consistent in the types.
(1, {"value" : "Robert"}),
(2, {"value" : "Mary"})
],
["id", "name"] # add your column names here
)
dob = spark.createDataFrame(
[
(1, {"value" : "21-04-1988"}), # create your data here, be consistent in the types.
(2, {"value" : None})
],
["id", "dob"] # add your column names here
)
country = spark.createDataFrame(
[
(1, {"value" : "IT"}), # create your data here, be consistent in the types.
(1, {"value" : "DE"}),
(2, {"value" : "FR"}),
(2, {"value" : "ES"}),
],
["id", "country"] # add your column names here
)
(name.join(dob, "id", "outer").join(country, "id", "outer")).show()
产生这个:
id name dob country
---------------------------------------------------------------
1 | {"value":"Robert"} |{"value":"21-04-1988"} |{"value":"DE"}
1 | {"value":"Robert"} |{"value":"21-04-1988"} |{"value":"IT"}
1 | {"value":"bob"} |{"value":"21-04-1988"} |{"value":"DE"}
1 | {"value":"bob"} |{"value":"21-04-1988"} |{"value":"IT"}
2 | {"value":"Mary"} |{"value":null} |{"value":"ES"}
2 | {"value":"Mary"} |{"value":null} |{"value":"FR"}
现在我明白这正是完整外部联接的工作方式 - 但我不需要其中的那些额外的重复信息(我需要包含尽可能多的行数)。
有线索吗?
您可以使用 row_number()
将列 id2
添加到所有三个数据帧,然后将其与 id
一起用作连接条件:
from pyspark.sql import functions as F, Window
w = Window.partitionBy("id").orderBy(F.lit(None)) # change this if you have some column to use for ordering
name = name.withColumn("id2", F.row_number().over(w))
dob = dob.withColumn("id2", F.row_number().over(w))
country = country.withColumn("id2", F.row_number().over(w))
result = (name.join(dob, ["id", "rn"], "full")
.join(country, ["id", "rn"], "full")
.drop("rn")
)
result.show(truncate=False)
#+---+-----------------+---------------------+-------------+
#|id |name |dob |country |
#+---+-----------------+---------------------+-------------+
#|1 |{value -> bob} |{value -> 21-04-1988}|{value -> IT}|
#|1 |{value -> Robert}|null |{value -> DE}|
#|2 |{value -> Mary} |{value -> null} |{value -> FR}|
#|2 |null |null |{value -> ES}|
#+---+-----------------+---------------------+-------------+
我有这三个dfs:
id | name
------------------------
1 | {"value": "bob"}
1 | {"value": "Robert"}
2 | {"value": "Mary"}
id | dob
----------------------------
1 | {"value": "21-04-1988"}
2 | {"value": null}
id | country
--------------------
1 | {"value": "IT"}
1 | {"value": "DE"}
2 | {"value": "FR"}
2 | {"value": "ES"}
而且我想合并它们,但我不想重复信息。
id | name | dob |country
----------------------------------------------------------------------
1 | {"value": "bob"} | {"value": "21-04-1988"} | {"value": "IT"}
1 | {"value": "Robert"} | Null | {"value": "DE"}
2 | {"value": "Mary"} | {"value": Null} | {"value": "FR"}
2 | Null | Null | {"value": "ES"}
我试过使用多重外连接,但没有得到上面的结果 table。
name = spark.createDataFrame(
[
(1, {"value" : "bob"}), # create your data here, be consistent in the types.
(1, {"value" : "Robert"}),
(2, {"value" : "Mary"})
],
["id", "name"] # add your column names here
)
dob = spark.createDataFrame(
[
(1, {"value" : "21-04-1988"}), # create your data here, be consistent in the types.
(2, {"value" : None})
],
["id", "dob"] # add your column names here
)
country = spark.createDataFrame(
[
(1, {"value" : "IT"}), # create your data here, be consistent in the types.
(1, {"value" : "DE"}),
(2, {"value" : "FR"}),
(2, {"value" : "ES"}),
],
["id", "country"] # add your column names here
)
(name.join(dob, "id", "outer").join(country, "id", "outer")).show()
产生这个:
id name dob country
---------------------------------------------------------------
1 | {"value":"Robert"} |{"value":"21-04-1988"} |{"value":"DE"}
1 | {"value":"Robert"} |{"value":"21-04-1988"} |{"value":"IT"}
1 | {"value":"bob"} |{"value":"21-04-1988"} |{"value":"DE"}
1 | {"value":"bob"} |{"value":"21-04-1988"} |{"value":"IT"}
2 | {"value":"Mary"} |{"value":null} |{"value":"ES"}
2 | {"value":"Mary"} |{"value":null} |{"value":"FR"}
现在我明白这正是完整外部联接的工作方式 - 但我不需要其中的那些额外的重复信息(我需要包含尽可能多的行数)。
有线索吗?
您可以使用 row_number()
将列 id2
添加到所有三个数据帧,然后将其与 id
一起用作连接条件:
from pyspark.sql import functions as F, Window
w = Window.partitionBy("id").orderBy(F.lit(None)) # change this if you have some column to use for ordering
name = name.withColumn("id2", F.row_number().over(w))
dob = dob.withColumn("id2", F.row_number().over(w))
country = country.withColumn("id2", F.row_number().over(w))
result = (name.join(dob, ["id", "rn"], "full")
.join(country, ["id", "rn"], "full")
.drop("rn")
)
result.show(truncate=False)
#+---+-----------------+---------------------+-------------+
#|id |name |dob |country |
#+---+-----------------+---------------------+-------------+
#|1 |{value -> bob} |{value -> 21-04-1988}|{value -> IT}|
#|1 |{value -> Robert}|null |{value -> DE}|
#|2 |{value -> Mary} |{value -> null} |{value -> FR}|
#|2 |null |null |{value -> ES}|
#+---+-----------------+---------------------+-------------+