部分总和的 PySpark RDD 处理
PySpark RDD processing for sum of parts
我有一个 RDD,其中包含像 (datetime, integer) 这样的元组。
我尝试用 pyspark 得到另一个 RDD 的一些区间求和。
例如,来自以下
(2015-09-30 10:00:01, 3)
(2015-09-30 10:00:02, 1)
(2015-09-30 10:00:05, 2)
(2015-09-30 10:00:06, 7)
(2015-09-30 10:00:07, 3)
(2015-09-30 10:00:10, 5)
我正在尝试每 3 秒获取以下总和:
(2015-09-30 10:00:01, 4) # sum of 1, 2, 3 seconds
(2015-09-30 10:00:02, 1) # sum of 2, 3, 4 seconds
(2015-09-30 10:00:05, 12) # sum of 5, 6, 7 seconds
(2015-09-30 10:00:06, 10) # sum of 6, 7, 8 seconds
(2015-09-30 10:00:07, 3) # sum of 7, 8, 9 seconds
(2015-09-30 10:00:10, 5) # sum of 10, 11, 12 seconds
拜托,你能给我一些提示吗?
我假设您的输入是一个 RDD time_rdd
,其中第一个元素是日期时间对象,第二个元素是整数。您可以使用 flatMap 将每个日期时间对象映射到前 3 秒,然后使用 reduceByKey 获取该 window.
的总计数
def map_to_3_seconds(datetime_obj, count):
list_times = []
for i in range(-2, 1):
list_times.append((datetime_obj + timedelta(seconds = i), count))
return list_times
output_rdd = time_rdd.flatMap(lambda (datetime_obj, count): map_to_3_seconds(datetime_obj, count)).reduceByKey(lambda x,y: x+y)
此 RDD 将包含比原始 RDD 中更多的日期时间对象,因此如果您只想拥有原始时间,则需要与 time_rdd
、
result = output_rdd.join(time_rdd).map(lambda (key, vals): (key, vals[0])).collect()
现在结果将包含:
[(datetime.datetime(2015, 9, 30, 10, 0, 5), 12),
(datetime.datetime(2015, 9, 30, 10, 0, 2), 1),
(datetime.datetime(2015, 9, 30, 10, 0, 10), 5),
(datetime.datetime(2015, 9, 30, 10, 0, 1), 4),
(datetime.datetime(2015, 9, 30, 10, 0, 6), 10),
(datetime.datetime(2015, 9, 30, 10, 0, 7), 3)]
我有一个 RDD,其中包含像 (datetime, integer) 这样的元组。 我尝试用 pyspark 得到另一个 RDD 的一些区间求和。
例如,来自以下
(2015-09-30 10:00:01, 3)
(2015-09-30 10:00:02, 1)
(2015-09-30 10:00:05, 2)
(2015-09-30 10:00:06, 7)
(2015-09-30 10:00:07, 3)
(2015-09-30 10:00:10, 5)
我正在尝试每 3 秒获取以下总和:
(2015-09-30 10:00:01, 4) # sum of 1, 2, 3 seconds
(2015-09-30 10:00:02, 1) # sum of 2, 3, 4 seconds
(2015-09-30 10:00:05, 12) # sum of 5, 6, 7 seconds
(2015-09-30 10:00:06, 10) # sum of 6, 7, 8 seconds
(2015-09-30 10:00:07, 3) # sum of 7, 8, 9 seconds
(2015-09-30 10:00:10, 5) # sum of 10, 11, 12 seconds
拜托,你能给我一些提示吗?
我假设您的输入是一个 RDD time_rdd
,其中第一个元素是日期时间对象,第二个元素是整数。您可以使用 flatMap 将每个日期时间对象映射到前 3 秒,然后使用 reduceByKey 获取该 window.
def map_to_3_seconds(datetime_obj, count):
list_times = []
for i in range(-2, 1):
list_times.append((datetime_obj + timedelta(seconds = i), count))
return list_times
output_rdd = time_rdd.flatMap(lambda (datetime_obj, count): map_to_3_seconds(datetime_obj, count)).reduceByKey(lambda x,y: x+y)
此 RDD 将包含比原始 RDD 中更多的日期时间对象,因此如果您只想拥有原始时间,则需要与 time_rdd
、
result = output_rdd.join(time_rdd).map(lambda (key, vals): (key, vals[0])).collect()
现在结果将包含:
[(datetime.datetime(2015, 9, 30, 10, 0, 5), 12),
(datetime.datetime(2015, 9, 30, 10, 0, 2), 1),
(datetime.datetime(2015, 9, 30, 10, 0, 10), 5),
(datetime.datetime(2015, 9, 30, 10, 0, 1), 4),
(datetime.datetime(2015, 9, 30, 10, 0, 6), 10),
(datetime.datetime(2015, 9, 30, 10, 0, 7), 3)]