在一个 rdd 中执行和存储各种聚合

perform and store various aggregates in one rdd

我有这样的数据:

[('a',110),
 ('a',130),
 ('a',120),
 ('b',200),
 ('b',206)]

我想对键进行分组并对值执行 计数、平均值、最小值和最大值 以获得以下结果:

[('a', 3, 120, 110, 130),
 ('b', 2, 203, 200, 206)]

我大致知道如何使用 countByKey() 和 reduceByKey() 单独完成每个聚合,但我不确定如何将它们全部包含在一个 RDD 中。有帮助吗?

编辑:这是我真正的rdd

的片段
Out[16]: [('Alaska Airlines Inc.', 17.0),
 ('Alaska Airlines Inc.', 63.0),
 ('Alaska Airlines Inc.', 70.0),
 ('Alaska Airlines Inc.', 17.0),
 ('Alaska Airlines Inc.', 16.0),
 ('United Airlines', 9.0),
 ('United Airlines', 197.0),
 ('United Airlines', 115.0),
 ('United Airlines', 6.0),
 ('United Airlines', 1.0),

嗯,我设法通过使用 aggregateByKey 函数和 map 到 return 所需的“模式”得到了你的解决方案:

data = sc.parallelize([('a', 110), ('a', 120), ('a', 130), ('b', 200), ('b', 206)])


def sequence_operator(accumulator, element):
  return (accumulator[0] + 1,
         accumulator[1] + element, 
         min(accumulator[2], element),
         max(accumulator[3], element))


def combination_operator(current_accumulator, next_accumulator):
  return (current_accumulator[0] + next_accumulator[0],
         current_accumulator[1] + next_accumulator[1], 
         min(current_accumulator[2], next_accumulator[2]),
         max(current_accumulator[3], next_accumulator[3]))


def unpack_aggregations(data):
  key = data[0]
  count, total, minimum, maximum = data[1]
  return key, count, total / count, minimum, maximum


aggregations = data.aggregateByKey(zeroValue=(0, 0, float('inf'), float('-inf')), seqFunc=sequence_operator, combFunc=combination_operator)
mapped_data = aggregations.map(unpack_aggregations)
print(mapped_data.collect())

输出

[('a', 3, 120.0, 110, 130), ('b', 2, 203.0, 200, 206)]