在 groupby 之后将 Spark RDD 的行聚合到 String

Aggregate rows of Spark RDD to String after groupby

我有一个如下所示的 RDD,元组中的第一个条目是作者,第二个条目是出版物的标题。

[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
 ('Won Kim',
  'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]

我想将每位作者的出版物标题连接在一起。示例输出为:

[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.''Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]

鉴于这是一个 RDD,我该怎么做?我已经看到 了解如何使用数据帧执行此操作,但没有看到 RDD。

这是我尝试过的方法,但我怀疑问题出在我的使用方式上 reduceByKey。 Pyspark SQL 函数 collect_list 适用于数据帧,但不幸的是我需要将此数据保存为 RDD。

title_author.map(lambda r: [(r[0],r[1]) for r[1] in r]).reduceByKey(add)

试试这个作为替代方案。

   def listToString(s): 
    
    # initialize an empty string    str1 += ele
    str1 = "" 
    cnt = 0
    
    # traverse in the string  
    for ele in s: 
        if cnt == 0:
          str1 += "\'" + ele + "\'"
          
        else:
          str1 += "," + "\'" + ele + "\'"
          
        cnt += 1
        
    # return string  
    return str1
  
rdd = sc.sparkContext.parallelize (
[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
 ('Won Kim',
  'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')] )

rdd2 = rdd.groupByKey().mapValues(sorted)
rdd2.take(5)

rdd3 = rdd2.map(lambda x: (x[0], listToString(x[1]))) 
rdd3.take(5)  

您也可以这样做:

rdd4 = rdd2.reduceByKey(_ + _)
rdd4.take(5)

试试看最近的是什么。