如何避免 Spark 中的嵌套地图调用?
How to avoid nested map calls in Spark?
我有一个交易列表,其中用户将板从一个站点带到另一个站点。这是一个数组的数组,叫做trans
:
Board: User: Station: Action: Time:
[
['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'],
['1', 'Ana', 'Soho' , 'park' , '2:00pm'],
['1', 'Bob', 'Soho' , 'check_out', '3:00pm'],
['1', 'Bob', 'Chelsea', 'park' , '4:00pm'],
['2'...]
]
(例如,董事会 '1' 在 '1:00pm' 由 'Ana' 在 'Tribeca' 签出,在 '2:00pm' 停在 Soho,然后由 'Bob').
使用这行代码,我将每个板分组到其交易:
board_groups = trans
.map(lambda oneTrans:
(oneTrans[0],
[oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]]))
.groupByKey()
.mapValues(list)
给予:
('1', [
['Ana' , 'Tribeca' , 'check_out', '1:00pm'],
['Ana' , 'Soho' , 'park' , '2:00pm'],
['Bob' , 'Soho' , 'check_out', '3:00pm'],
['Bob' , 'Chelsea' , 'park' , '4:00pm' ]
]),
('2', ...)
我怎样才能做到这一点:
('1', ('Ana',
[
['Tribeca', 'check_out', '1:00pm'],
['Soho' , 'park' , '2:00pm']
]
),
('Bob',
[
['Soho' , 'check_out', '3:00pm'],
['Chelsea', 'park' , '4:00pm' ]
]
)
),
('2'...)
使用 Spark 的方法?
由于 board_groups
的每个值都是一个数组数组,我尝试在其上使用 mapValue
,这样对于每个值中的每个数组,我可以 map
到使每个用户成为一个密钥,然后将用户分组在一起。但是我不能这样做,因为这涉及嵌套并行化调用。
假设你的数据是
df.show
+-----+----+-------+---------+------+
|board|user|station| action| time|
+-----+----+-------+---------+------+
| 1| Ana|Tribeca|check_out|1:00pm|
| 1| Ana| Soho| park|2:00pm|
| 1| Bob| Soho|check_out|3:00pm|
| 1| Bob|Chelsea| park|4:00pm|
| 2| Tom|Chelsea| park|4:00pm|
+-----+----+-------+---------+------+
您可以使用以下方式对数据进行分组:
import org.apache.spark.sql.functions.collect_list
df.select(struct('board, 'user).as("group"), struct('station, 'action, 'time).as('item))
.groupBy('group).agg(collect_list('item).as("items"))
.select(col("group.board"), struct(col("group.user"), col("items")).as("user_action"))
.groupBy('board).agg(collect_list('user_action))
结果将如下所示:
+-----+--------------------------------------------------------------------------------------------------------------------------+
|board|collect_list(user_action) |
+-----+--------------------------------------------------------------------------------------------------------------------------+
|1 |[[Ana, [[Tribeca, check_out, 1:00pm], [Soho, park, 2:00pm]]], [Bob, [[Soho, check_out, 3:00pm], [Chelsea, park, 4:00pm]]]]|
|2 |[[Tom, [[Chelsea, park, 4:00pm]]]] |
+-----+--------------------------------------------------------------------------------------------------------------------------+
这里的代码是用 Scala 编写的,在 Python 中几乎相同。
我有一个交易列表,其中用户将板从一个站点带到另一个站点。这是一个数组的数组,叫做trans
:
Board: User: Station: Action: Time:
[
['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'],
['1', 'Ana', 'Soho' , 'park' , '2:00pm'],
['1', 'Bob', 'Soho' , 'check_out', '3:00pm'],
['1', 'Bob', 'Chelsea', 'park' , '4:00pm'],
['2'...]
]
(例如,董事会 '1' 在 '1:00pm' 由 'Ana' 在 'Tribeca' 签出,在 '2:00pm' 停在 Soho,然后由 'Bob').
使用这行代码,我将每个板分组到其交易:
board_groups = trans
.map(lambda oneTrans:
(oneTrans[0],
[oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]]))
.groupByKey()
.mapValues(list)
给予:
('1', [
['Ana' , 'Tribeca' , 'check_out', '1:00pm'],
['Ana' , 'Soho' , 'park' , '2:00pm'],
['Bob' , 'Soho' , 'check_out', '3:00pm'],
['Bob' , 'Chelsea' , 'park' , '4:00pm' ]
]),
('2', ...)
我怎样才能做到这一点:
('1', ('Ana',
[
['Tribeca', 'check_out', '1:00pm'],
['Soho' , 'park' , '2:00pm']
]
),
('Bob',
[
['Soho' , 'check_out', '3:00pm'],
['Chelsea', 'park' , '4:00pm' ]
]
)
),
('2'...)
使用 Spark 的方法?
由于 board_groups
的每个值都是一个数组数组,我尝试在其上使用 mapValue
,这样对于每个值中的每个数组,我可以 map
到使每个用户成为一个密钥,然后将用户分组在一起。但是我不能这样做,因为这涉及嵌套并行化调用。
假设你的数据是
df.show
+-----+----+-------+---------+------+
|board|user|station| action| time|
+-----+----+-------+---------+------+
| 1| Ana|Tribeca|check_out|1:00pm|
| 1| Ana| Soho| park|2:00pm|
| 1| Bob| Soho|check_out|3:00pm|
| 1| Bob|Chelsea| park|4:00pm|
| 2| Tom|Chelsea| park|4:00pm|
+-----+----+-------+---------+------+
您可以使用以下方式对数据进行分组:
import org.apache.spark.sql.functions.collect_list
df.select(struct('board, 'user).as("group"), struct('station, 'action, 'time).as('item))
.groupBy('group).agg(collect_list('item).as("items"))
.select(col("group.board"), struct(col("group.user"), col("items")).as("user_action"))
.groupBy('board).agg(collect_list('user_action))
结果将如下所示:
+-----+--------------------------------------------------------------------------------------------------------------------------+
|board|collect_list(user_action) |
+-----+--------------------------------------------------------------------------------------------------------------------------+
|1 |[[Ana, [[Tribeca, check_out, 1:00pm], [Soho, park, 2:00pm]]], [Bob, [[Soho, check_out, 3:00pm], [Chelsea, park, 4:00pm]]]]|
|2 |[[Tom, [[Chelsea, park, 4:00pm]]]] |
+-----+--------------------------------------------------------------------------------------------------------------------------+
这里的代码是用 Scala 编写的,在 Python 中几乎相同。