Pyspark:比较RDD的元素
Pyspark: comparing elements of RDD
与我发布的关于使用 DF 的问题 类似,如何检索每个序列中的第一个元素,但在这种情况下使用 RDD?我想将每一项与前一项进行比较。序列中后面重复的项目是可以接受的,即 (67,375, 14:20:14)
可能会出现在 RDD 的后面,应该保留。
输入
(67, 312, 12:09:00)
(67, 375, 12:23:00)
(67, 375, 12:25:00)
(67, 650, 12:26:00)
(75, 650, 12:27:00)
(75, 650, 12:29:00)
(75, 800, 12:30:00)
(67, 375, 14:20:14)
输出
(67, 312, 12:09:00)
(67, 375, 12:23:00)
(67, 650, 12:26:00)
(75, 650, 12:27:00)
(75, 800, 12:30:00)
(67, 375, 14:20:14)
这行得通。但是,我唯一担心的是,你不能依赖 rdd 上的转换将导致的输出顺序。因此,为了保留顺序,我强烈建议你按列排序,幸运的是你有时间戳列。
如果您不打算按时间戳排序,请使用数据框窗口方法。即使在那里,您可能也需要排序 :)
rdd = sc.parallelize([(67, 312, "12:09:00"),
(67, 375, "12:23:00"),
(67, 375, "12:25:00"),
(67, 650, "12:26:00"),
(75, 650, "12:27:00"),
(75, 650, "12:29:00"),
(75, 800, "12:30:00") ])
# Fix 1st two columns as keys.
rdd_fix_keys = rdd.map(lambda x:((x[0],x[1]),(x[2])))
# Group the values of similar keys.
rdd_group_by_key = rdd_regroup_keys.reduceByKey(lambda x,y:(x,y))
# Pick first occurence of the grouped values, as per your requirement.
rdd_pick_first_occurence = rdd_group_by_key.map(lambda x:(x[0], x[1][0]) if not isinstance(x[1], str) else x)
# Sort by timestamp.
rdd_pick_first_occurence.map(lambda x:(x[0][0],x[0][1],x[1])).sortBy(lambda x: x[2]).collect()
注:此处顺序有所改动
与我发布的关于使用 DF 的问题 (67,375, 14:20:14)
可能会出现在 RDD 的后面,应该保留。
输入
(67, 312, 12:09:00)
(67, 375, 12:23:00)
(67, 375, 12:25:00)
(67, 650, 12:26:00)
(75, 650, 12:27:00)
(75, 650, 12:29:00)
(75, 800, 12:30:00)
(67, 375, 14:20:14)
输出
(67, 312, 12:09:00)
(67, 375, 12:23:00)
(67, 650, 12:26:00)
(75, 650, 12:27:00)
(75, 800, 12:30:00)
(67, 375, 14:20:14)
这行得通。但是,我唯一担心的是,你不能依赖 rdd 上的转换将导致的输出顺序。因此,为了保留顺序,我强烈建议你按列排序,幸运的是你有时间戳列。
如果您不打算按时间戳排序,请使用数据框窗口方法。即使在那里,您可能也需要排序 :)
rdd = sc.parallelize([(67, 312, "12:09:00"),
(67, 375, "12:23:00"),
(67, 375, "12:25:00"),
(67, 650, "12:26:00"),
(75, 650, "12:27:00"),
(75, 650, "12:29:00"),
(75, 800, "12:30:00") ])
# Fix 1st two columns as keys.
rdd_fix_keys = rdd.map(lambda x:((x[0],x[1]),(x[2])))
# Group the values of similar keys.
rdd_group_by_key = rdd_regroup_keys.reduceByKey(lambda x,y:(x,y))
# Pick first occurence of the grouped values, as per your requirement.
rdd_pick_first_occurence = rdd_group_by_key.map(lambda x:(x[0], x[1][0]) if not isinstance(x[1], str) else x)
# Sort by timestamp.
rdd_pick_first_occurence.map(lambda x:(x[0][0],x[0][1],x[1])).sortBy(lambda x: x[2]).collect()
注:此处顺序有所改动