部分总和的 PySpark RDD 处理

PySpark RDD processing for sum of parts

我有一个 RDD,其中包含像 (datetime, integer) 这样的元组。 我尝试用 pyspark 得到另一个 RDD 的一些区间求和。

例如,来自以下

(2015-09-30 10:00:01, 3)
(2015-09-30 10:00:02, 1)
(2015-09-30 10:00:05, 2)
(2015-09-30 10:00:06, 7)
(2015-09-30 10:00:07, 3)
(2015-09-30 10:00:10, 5)

我正在尝试每 3 秒获取以下总和:

(2015-09-30 10:00:01, 4)  # sum of 1, 2, 3 seconds
(2015-09-30 10:00:02, 1)  # sum of 2, 3, 4 seconds
(2015-09-30 10:00:05, 12) # sum of 5, 6, 7 seconds
(2015-09-30 10:00:06, 10) # sum of 6, 7, 8 seconds
(2015-09-30 10:00:07, 3)  # sum of 7, 8, 9 seconds
(2015-09-30 10:00:10, 5)  # sum of 10, 11, 12 seconds

拜托,你能给我一些提示吗?

我假设您的输入是一个 RDD time_rdd,其中第一个元素是日期时间对象,第二个元素是整数。您可以使用 flatMap 将每个日期时间对象映射到前 3 秒,然后使用 reduceByKey 获取该 window.

的总计数
def map_to_3_seconds(datetime_obj, count):
    list_times = []
    for i in range(-2, 1):
        list_times.append((datetime_obj + timedelta(seconds = i), count))
    return list_times

output_rdd = time_rdd.flatMap(lambda (datetime_obj, count): map_to_3_seconds(datetime_obj, count)).reduceByKey(lambda x,y: x+y)

此 RDD 将包含比原始 RDD 中更多的日期时间对象,因此如果您只想拥有原始时间,则需要与 time_rdd

result = output_rdd.join(time_rdd).map(lambda (key, vals): (key, vals[0])).collect()

现在结果将包含:

[(datetime.datetime(2015, 9, 30, 10, 0, 5), 12),
(datetime.datetime(2015, 9, 30, 10, 0, 2), 1),
(datetime.datetime(2015, 9, 30, 10, 0, 10), 5),
(datetime.datetime(2015, 9, 30, 10, 0, 1), 4),
(datetime.datetime(2015, 9, 30, 10, 0, 6), 10),
(datetime.datetime(2015, 9, 30, 10, 0, 7), 3)]