使用 pyspark RDD .groupByKey 提取每组的最高值元素

Using pyspark RDD .groupByKey extract highest value element per group

TOKEN_RE = re.compile(r"\b[\w']+\b")
def pos_tag_counter(line):
    toks = nltk.regexp_tokenize(line.lower(), TOKEN_RE)
    postoks = nltk.tag.pos_tag(toks)
    return postoks

pos_tag_counts = text.filter(lambda line: len(line) > 0) \
    .filter(lambda line: re.findall('^(?!URL).*', line)) \
    .flatMap(pos_tag_counter) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .map(lambda x: (x[0][1], (x[1], x[0][0]))) \
    .groupByKey().map(lambda x : (x[0], list(x[1])))

我有一个文本文件被减少为行,而不是单词,单词被计算并用 POS(词性)标签标记。所以我现在拥有的是一系列元组 (pos, (word, count))。 POS 是关键。我需要为每个 POS 找到最常见的词。

[('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season')),

这是我的第一个 pyspark 项目,所以我认为我没有完全掌握这个概念。我用了组

[('VBD',
[(563, 'kept'),
(56715, 'said'),
(2640, 'got'),
(12370, 's'),
(55523, 'was'),
(62, 'snapped'),

理想情况下,只要元组显示每个 POS 的最高计数单词,输出将是 - (POS, count, word) 任何顺序:

('NN', 1884, 'washington')
('DT', 435969, 'the')
等等

您不能将映射步骤更改为 map(lambda x: (x[0][1], x[1], x[0][0])) 即:

pos_tag_counts = text.filter(lambda line: len(line) > 0) \
    .filter(lambda line: re.findall('^(?!URL).*', line)) \
    .flatMap(pos_tag_counter) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .map(lambda x: (x[0][1], x[1], x[0][0])) 

基本思路是groupByKey,然后找出每组的最大值。由于你需要最长的单词,你可以将max方法的关键定义为单词的长度。

rdd = sc.parallelize([('NN', (1884, 'washington')),
    ('NN', (5, 'stellar')),
    ('VBD', (563, 'kept')),
    ('DT', (435969, 'the')),
    ('JJ', (9300, 'first')),
    ('NN', (1256, 'half')),
    ('NN', (4028, 'season'))])

pos_count = rdd.groupByKey()
               .mapValues(lambda v: max(v, key=lambda x: len(x[1])))

print(pos_count.collect())
# [('DT', (435969, 'the')), ('VBD', (563, 'kept')), ('NN', (1884, 'washington')), ('JJ', (9300, 'first'))]