Pyspark 高效 map-reduce 算法对分布式系统中的子列表进行排序

Pyspark efficient map-reduce algorithm to sort sub-lists in a distributed system

给定一个 RDD:

+----------+----------+------------------------+
| a        | me       | [(1;1); (10;2); (5;3)] |
|          |          |                        |
| b        | dog      | [(1;3); (10;4); (2;4)] |
+----------+----------+------------------------+

我想要映射每一行,以便它的子数组按键(每个元组的第一个元素)排序,而不使用内置 Python 函数,因为这些不是分布式函数。子列表的大小也很大。排序后的输出将如下所示:

+----------+----------+------------------------+
| a        | me       | [(1;1); (5;3); (10;2)] |
|          |          |                        |
| b        | dog      | [(1;3); (2;4); (10;4)] |
+----------+----------+------------------------+

你可以处理分号“;”作为逗号。使用它们是为了让我使用的 table 生成器不会将数组拆分成多个部分。

原始输入:

(a,me,[(1,1),(10,2),(5,3)])
(b,dog,[(1,3),(10,4),(2,4)])

原始输出:

(a,me,[(1,1),(5,3),(10,2)])
(b,dog,[(1,3),(2,4),(10,4)])

目前我正在使用它和一个简单的 map() 调用来对子列表进行排序:

def sort_sublist(row):
    return (row[0], row[1], sorted(row[2], key=lambda tup: int(tup[0])))
...
my_rdd = my_rdd.map(lambda row: sort_sublist(row))

该函数使用了 Python 的非分布式 sorted() 函数。为了使 map-reduce 算法更高效,我需要找到一种方法来使用 Apache Spark 的函数(map()、reduce() 等)来完成上述函数中完成的工作。

一个想法:

我已经完成了以下伪代码:

[ (a,me,[(1,1),(5,3),(10,2)]),
(b,dog,[(1,3),(2,4),(10,4)]) ]

=> map =>

[ [(1,1),(5,3),(10,2)],
[(1,3),(2,4),(10,4)] ]

=> zipWithIndex =>

[ ([(1,1),(5,3),(10,2)], 0),
([(1,3),(2,4),(10,4)], 1) ]

=> flatMap =>

[ ( (0,(1,1)),(0,(5,3)),(0,(10,2)) ),
( (1,(1,3)),(1,(2,4)),(1,(10,4)) ) ]

这就是我 运行 遇到麻烦的地方。如果我使用字符串连接创建唯一的字符串键:

尝试 1:

=> map =>

[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]

=> key string concatenation =>

[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(12,(2,4)),(110,(10,4)) ]

=> sortByKey =>

[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(110,(10,4)),(12,(2,4)) ]

问题1:第二行的顺序不对。如果我使用整数创建唯一键:

尝试 2:

=> map =>

[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]

=> key integer sum =>

[ (1,(1,1)),(5,(5,3)),(10,(10,2)),
(2,(1,3)),(3,(2,4)),(11,(10,4)) ]

=> sortByKey =>

[ (1,(1,1)),(2,(1,3)),(3,(2,4)),
(5,(5,3)),(10,(10,2)),(11,(10,4)) ]

问题 2: 行的顺序在此过程中丢失。

问题的要点是找到一种方法来保持行的顺序,同时能够为该行中的每个键提供一个值来排序,这样每一行的元组对都按元组的键整数排序价值。我的方法可能不是解决方案。我也是 Apache Spark 的新手,所以希望对其内部工作原理有更多了解的人可以提供一些关于是否有办法实现这一目标的见解。

如果您正在寻找一种从最小值到最大值排序的算法

def sortList(x):
    currentlist = x
    newlist = []
    for i in range(len(currentlist)-1):
        newlist.append(min(currentlist))
    return newlist

如果这没有太大帮助,我很抱歉,但我希望这对您有所帮助!

自从我在 Spark 工作以来已经有很长一段时间了,但据我所知,sorted 不是分布式的,因为在 [=17] 内部调用时,它将按 RDD 分区应用=].

不过,如果您真的想避免 sorted,这里有一个相当笨拙的方法可以实现您的目标:

import pyspark
sc = pyspark.SparkContext() 

# load data
data = [('a','me',[(1,1),(10,2),(5,3)]),
        ('b','dog',[(1,3),(10,4),(2,4)])]
rdd = sc.parallelize(data)

# perform sorting
(rdd.map(lambda x: (x[0],x[1]))
    .zipWithIndex()
    .map(lambda x: (x[1],x[0]))
    .join(
        rdd.map(lambda x: x[2])
           .zipWithIndex()
           .flatMap(lambda x: [(x[1],y) for y in x[0]])
           .map(lambda x: (x[1][0], (x[1][1], x[0])))
           .sortByKey()
           .map(lambda x: (x[1][1], (x[0], x[1][0])))
           .groupByKey()
           .map(lambda x: (x[0], list(x[1])))
    )
    .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
    .collect()
)

join() 中的代码对元组的内部列表进行排序。 join() 和周围的代码用于将排序的元组连接回它们开始的字符串条目('a'、'me' 等)。

更新
回复一个关于效率的评论问题,sorted肯定比我上面提供的方案快。下面是一些要演示的示例数据,子列表中有 10,000 个元组:

import numpy as np
minval = 1
maxval = 11
N = 10000
tup_list1 = zip(np.random.randint(minval,maxval,N),
                np.random.randint(minval,maxval,N))
tup_list2 = zip(np.random.randint(minval,maxval,N),
                np.random.randint(minval,maxval,N))

data = [('a','me',tup_list1),
        ('b','dog',tup_list2)]
rdd = sc.parallelize(data)

没有排序,使用上述方法:

%timeit (rdd.map(lambda x: (x[0],x[1]))
            .zipWithIndex()
            .map(lambda x: (x[1],x[0]))
            .join(rdd.map(lambda x: x[2])
                     .zipWithIndex()
                     .flatMap(lambda x: [(x[1],y) for y in x[0]])
                     .map(lambda x: (x[1][0], (x[1][1],x[0]))) 
                     .sortByKey()
                     .map(lambda x: (x[1][1], (x[0],x[1][0])))
                     .groupByKey()
                     .map(lambda x: (x[0],list(x[1]))))
                     .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
                     .collect())

回复:

# The slowest run took 25.94 times longer than the fastest.  
# This could mean that an intermediate result is being cached.
# 1 loop, best of 3: 1.18 s per loop

OP的原始做法,使用sorted:

%timeit (rdd.map(lambda x: (x[0],x[1], sorted(x[2], key=lambda tup: int(tup[0]))))
            .collect())

回复:

# 1 loop, best of 3: 193 ms per loop

谨慎使用 cache() 可能会有一些加速,但是 sorted 仍然是这里更简单和更快的解决方案。

这并不是 map/reduce 范例的绝佳用例;在我的回答中,我有点强迫它。 Map/reduce当有很多重复键时更强大,并且可以申请有用的函数来聚合这些键各自的值。