Pyspark:比较RDD的元素

Pyspark: comparing elements of RDD

与我发布的关于使用 DF 的问题 类似,如何检索每个序列中的第一个元素,但在这种情况下使用 RDD?我想将每一项与前一项进行比较。序列中后面重复的项目是可以接受的,即 (67,375, 14:20:14) 可能会出现在 RDD 的后面,应该保留。

输入


(67, 312, 12:09:00)
(67, 375, 12:23:00)  
(67, 375, 12:25:00)    
(67, 650, 12:26:00)
(75, 650, 12:27:00)  
(75, 650, 12:29:00)    
(75, 800, 12:30:00)    
(67, 375, 14:20:14)

输出



(67, 312, 12:09:00)
(67, 375, 12:23:00)  
(67, 650, 12:26:00)
(75, 650, 12:27:00)  
(75, 800, 12:30:00)  
(67, 375, 14:20:14)

这行得通。但是,我唯一担心的是,你不能依赖 rdd 上的转换将导致的输出顺序。因此,为了保留顺序,我强烈建议你按列排序,幸运的是你有时间戳列。

如果您不打算按时间戳排序,请使用数据框窗口方法。即使在那里,您可能也需要排序 :)

rdd = sc.parallelize([(67, 312, "12:09:00"),
(67, 375, "12:23:00"), 
(67, 375, "12:25:00"),   
(67, 650, "12:26:00"),
(75, 650, "12:27:00"), 
(75, 650, "12:29:00"),   
(75, 800, "12:30:00") ])

# Fix 1st two columns as keys.
rdd_fix_keys = rdd.map(lambda x:((x[0],x[1]),(x[2])))

# Group the values of similar keys.
rdd_group_by_key = rdd_regroup_keys.reduceByKey(lambda x,y:(x,y))

# Pick first occurence of the grouped values, as per your requirement.
rdd_pick_first_occurence = rdd_group_by_key.map(lambda x:(x[0], x[1][0]) if not isinstance(x[1], str) else x)

# Sort by timestamp.
rdd_pick_first_occurence.map(lambda x:(x[0][0],x[0][1],x[1])).sortBy(lambda x: x[2]).collect()

注:此处顺序有所改动