在 groupby 之后将 Spark RDD 的行聚合到 String
Aggregate rows of Spark RDD to String after groupby
我有一个如下所示的 RDD,元组中的第一个条目是作者,第二个条目是出版物的标题。
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
我想将每位作者的出版物标题连接在一起。示例输出为:
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.''Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
鉴于这是一个 RDD,我该怎么做?我已经看到 了解如何使用数据帧执行此操作,但没有看到 RDD。
这是我尝试过的方法,但我怀疑问题出在我的使用方式上 reduceByKey
。 Pyspark SQL 函数 collect_list
适用于数据帧,但不幸的是我需要将此数据保存为 RDD。
title_author.map(lambda r: [(r[0],r[1]) for r[1] in r]).reduceByKey(add)
试试这个作为替代方案。
def listToString(s):
# initialize an empty string str1 += ele
str1 = ""
cnt = 0
# traverse in the string
for ele in s:
if cnt == 0:
str1 += "\'" + ele + "\'"
else:
str1 += "," + "\'" + ele + "\'"
cnt += 1
# return string
return str1
rdd = sc.sparkContext.parallelize (
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')] )
rdd2 = rdd.groupByKey().mapValues(sorted)
rdd2.take(5)
rdd3 = rdd2.map(lambda x: (x[0], listToString(x[1])))
rdd3.take(5)
您也可以这样做:
rdd4 = rdd2.reduceByKey(_ + _)
rdd4.take(5)
试试看最近的是什么。
我有一个如下所示的 RDD,元组中的第一个条目是作者,第二个条目是出版物的标题。
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
我想将每位作者的出版物标题连接在一起。示例输出为:
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.''Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
鉴于这是一个 RDD,我该怎么做?我已经看到
这是我尝试过的方法,但我怀疑问题出在我的使用方式上 reduceByKey
。 Pyspark SQL 函数 collect_list
适用于数据帧,但不幸的是我需要将此数据保存为 RDD。
title_author.map(lambda r: [(r[0],r[1]) for r[1] in r]).reduceByKey(add)
试试这个作为替代方案。
def listToString(s):
# initialize an empty string str1 += ele
str1 = ""
cnt = 0
# traverse in the string
for ele in s:
if cnt == 0:
str1 += "\'" + ele + "\'"
else:
str1 += "," + "\'" + ele + "\'"
cnt += 1
# return string
return str1
rdd = sc.sparkContext.parallelize (
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')] )
rdd2 = rdd.groupByKey().mapValues(sorted)
rdd2.take(5)
rdd3 = rdd2.map(lambda x: (x[0], listToString(x[1])))
rdd3.take(5)
您也可以这样做:
rdd4 = rdd2.reduceByKey(_ + _)
rdd4.take(5)
试试看最近的是什么。