如何避免 Spark 中的嵌套地图调用?

How to avoid nested map calls in Spark?

我有一个交易列表,其中用户将板从一个站点带到另一个站点。这是一个数组的数组,叫做trans:

Board:  User:    Station:     Action:      Time:
[
 ['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'],
 ['1', 'Ana', 'Soho'   , 'park'     , '2:00pm'],
 ['1', 'Bob', 'Soho'   , 'check_out', '3:00pm'],
 ['1', 'Bob', 'Chelsea', 'park'     , '4:00pm'],
 ['2'...]
]

(例如,董事会 '1' 在 '1:00pm' 由 'Ana' 在 'Tribeca' 签出,在 '2:00pm' 停在 Soho,然后由 'Bob').

使用这行代码,我将每个板分组到其交易:

board_groups = trans
    .map(lambda oneTrans: 
        (oneTrans[0], 
        [oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]]))
    .groupByKey()
    .mapValues(list)

给予:

('1', [
       ['Ana' , 'Tribeca' , 'check_out', '1:00pm'],
       ['Ana' , 'Soho'    , 'park'     , '2:00pm'],
       ['Bob' , 'Soho'    , 'check_out', '3:00pm'],
       ['Bob' , 'Chelsea' , 'park'     , '4:00pm' ]
      ]),
('2', ...)

我怎样才能做到这一点:

('1', ('Ana',
       [
         ['Tribeca', 'check_out', '1:00pm'],
         ['Soho'   , 'park'     , '2:00pm']
       ]
      ),

      ('Bob',
       [
         ['Soho'   , 'check_out', '3:00pm'],
         ['Chelsea', 'park'     , '4:00pm' ]
       ]
      )
),
('2'...)

使用 Spark 的方法?

由于 board_groups 的每个值都是一个数组数组,我尝试在其上使用 mapValue,这样对于每个值中的每个数组,我可以 map 到使每个用户成为一个密钥,然后将用户分组在一起。但是我不能这样做,因为这涉及嵌套并行化调用。

假设你的数据是

df.show

+-----+----+-------+---------+------+
|board|user|station|   action|  time|
+-----+----+-------+---------+------+
|    1| Ana|Tribeca|check_out|1:00pm|
|    1| Ana|   Soho|     park|2:00pm|
|    1| Bob|   Soho|check_out|3:00pm|
|    1| Bob|Chelsea|     park|4:00pm|
|    2| Tom|Chelsea|     park|4:00pm|
+-----+----+-------+---------+------+

您可以使用以下方式对数据进行分组:

import org.apache.spark.sql.functions.collect_list

df.select(struct('board, 'user).as("group"), struct('station, 'action, 'time).as('item))
    .groupBy('group).agg(collect_list('item).as("items"))
    .select(col("group.board"), struct(col("group.user"), col("items")).as("user_action"))
    .groupBy('board).agg(collect_list('user_action))

结果将如下所示:

+-----+--------------------------------------------------------------------------------------------------------------------------+
|board|collect_list(user_action)                                                                                                 |
+-----+--------------------------------------------------------------------------------------------------------------------------+
|1    |[[Ana, [[Tribeca, check_out, 1:00pm], [Soho, park, 2:00pm]]], [Bob, [[Soho, check_out, 3:00pm], [Chelsea, park, 4:00pm]]]]|
|2    |[[Tom, [[Chelsea, park, 4:00pm]]]]                                                                                        |
+-----+--------------------------------------------------------------------------------------------------------------------------+

这里的代码是用 Scala 编写的,在 Python 中几乎相同。