使用 PySpark 在组后将行收集为 Spark 数据帧的数组
Collect rows as an array of a Spark dataframe after a group by using PySpark
我有以下 df:
url | source | value | name
----------------------------------
a | USA | 1 | registry_a
a | USA | 1 | registry_b
a | FRA | 1 | registry_a
b | DEU | 2 | null
b | DEU | 1 | registry_b
b | FRA | 1 | registry_a
c | ITA | 1 | registry_a
c | ITA | 0 | registry_b
并且我希望能够按 url
和 source
进行分组,并创建一个新列,其中包含名为 data
的数组列中该组的所有行。
url | source | data
----------------------------------------------------------------------------------
a | USA | [{"url": "a", "source": "USA", "value": 1, "name": "registry_a"},
| | {"url": "a", "source": "USA", "value": 1, "name": "registry_b"},
| | {"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]
a | FRA | [{"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]
b | DEU | [{"url": "b", "source": "DEU", "value": 2, "name": null},
| | {"url": "b", "source": "DEU", "value": 1, "name": "registry_a"}]
b | FRA | [{"url": "b", "source": "FRA", "value": 1, "name": "registry_a"}]
c | ITA | [{"url": "c", "source": "ITA", "value": 1, "name": "registry_a"},
| | {"url": "c", "source": "ITA", "value": 0, "name": "registry_b"}]
我试过了,但没用:
samples_to_map_df = (samples
.groupBy("url", "source")
.agg(F.map_from_arrays(F.collect_list(F.col("url")),
F.collect_list(F.col("source")),
F.collect_list(F.col("value")),
F.collect_list(F.col("value")))
.alias("data"))
)
我收到这个错误:
TypeError: map_from_arrays() takes 2 positional arguments but 4 were
given
您需要收集结构列表,然后使用 to_json
函数获得所需的输出:
import pyspark.sql.functions as F
samples_to_map_df = samples.groupBy("url", "source").agg(
F.to_json(
F.collect_list(
F.struct(*[F.col(c).alias(c) for c in samples.columns])
)
).alias("data")
)
samples_to_map_df.show(truncate=False)
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|url|source|data |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|a |USA |[{"url":"a","source":"USA","value":"1","name":"registry_a"},{"url":"a","source":"USA","value":"1","name":"registry_b"}]|
#|c |ITA |[{"url":"c","source":"ITA","value":"1","name":"registry_a"},{"url":"c","source":"ITA","value":"0","name":"registry_b"}]|
#|b |DEU |[{"url":"b","source":"DEU","value":"2"},{"url":"b","source":"DEU","value":"1","name":"registry_b"}] |
#|a |FRA |[{"url":"a","source":"FRA","value":"1","name":"registry_a"}] |
#|b |FRA |[{"url":"b","source":"FRA","value":"1","name":"registry_a"}] |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
我有以下 df:
url | source | value | name
----------------------------------
a | USA | 1 | registry_a
a | USA | 1 | registry_b
a | FRA | 1 | registry_a
b | DEU | 2 | null
b | DEU | 1 | registry_b
b | FRA | 1 | registry_a
c | ITA | 1 | registry_a
c | ITA | 0 | registry_b
并且我希望能够按 url
和 source
进行分组,并创建一个新列,其中包含名为 data
的数组列中该组的所有行。
url | source | data
----------------------------------------------------------------------------------
a | USA | [{"url": "a", "source": "USA", "value": 1, "name": "registry_a"},
| | {"url": "a", "source": "USA", "value": 1, "name": "registry_b"},
| | {"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]
a | FRA | [{"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]
b | DEU | [{"url": "b", "source": "DEU", "value": 2, "name": null},
| | {"url": "b", "source": "DEU", "value": 1, "name": "registry_a"}]
b | FRA | [{"url": "b", "source": "FRA", "value": 1, "name": "registry_a"}]
c | ITA | [{"url": "c", "source": "ITA", "value": 1, "name": "registry_a"},
| | {"url": "c", "source": "ITA", "value": 0, "name": "registry_b"}]
我试过了,但没用:
samples_to_map_df = (samples
.groupBy("url", "source")
.agg(F.map_from_arrays(F.collect_list(F.col("url")),
F.collect_list(F.col("source")),
F.collect_list(F.col("value")),
F.collect_list(F.col("value")))
.alias("data"))
)
我收到这个错误:
TypeError: map_from_arrays() takes 2 positional arguments but 4 were given
您需要收集结构列表,然后使用 to_json
函数获得所需的输出:
import pyspark.sql.functions as F
samples_to_map_df = samples.groupBy("url", "source").agg(
F.to_json(
F.collect_list(
F.struct(*[F.col(c).alias(c) for c in samples.columns])
)
).alias("data")
)
samples_to_map_df.show(truncate=False)
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|url|source|data |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|a |USA |[{"url":"a","source":"USA","value":"1","name":"registry_a"},{"url":"a","source":"USA","value":"1","name":"registry_b"}]|
#|c |ITA |[{"url":"c","source":"ITA","value":"1","name":"registry_a"},{"url":"c","source":"ITA","value":"0","name":"registry_b"}]|
#|b |DEU |[{"url":"b","source":"DEU","value":"2"},{"url":"b","source":"DEU","value":"1","name":"registry_b"}] |
#|a |FRA |[{"url":"a","source":"FRA","value":"1","name":"registry_a"}] |
#|b |FRA |[{"url":"b","source":"FRA","value":"1","name":"registry_a"}] |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+