使用 pyspark RDD .groupByKey 提取每组的最高值元素
Using pyspark RDD .groupByKey extract highest value element per group
TOKEN_RE = re.compile(r"\b[\w']+\b")
def pos_tag_counter(line):
toks = nltk.regexp_tokenize(line.lower(), TOKEN_RE)
postoks = nltk.tag.pos_tag(toks)
return postoks
pos_tag_counts = text.filter(lambda line: len(line) > 0) \
.filter(lambda line: re.findall('^(?!URL).*', line)) \
.flatMap(pos_tag_counter) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[0][1], (x[1], x[0][0]))) \
.groupByKey().map(lambda x : (x[0], list(x[1])))
我有一个文本文件被减少为行,而不是单词,单词被计算并用 POS(词性)标签标记。所以我现在拥有的是一系列元组 (pos, (word, count))。 POS 是关键。我需要为每个 POS 找到最常见的词。
[('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season')),
这是我的第一个 pyspark 项目,所以我认为我没有完全掌握这个概念。我用了组
[('VBD',
[(563, 'kept'),
(56715, 'said'),
(2640, 'got'),
(12370, 's'),
(55523, 'was'),
(62, 'snapped'),
理想情况下,只要元组显示每个 POS 的最高计数单词,输出将是 - (POS, count, word) 任何顺序:
('NN', 1884, 'washington')
('DT', 435969, 'the')
等等
您不能将映射步骤更改为 map(lambda x: (x[0][1], x[1], x[0][0]))
即:
pos_tag_counts = text.filter(lambda line: len(line) > 0) \
.filter(lambda line: re.findall('^(?!URL).*', line)) \
.flatMap(pos_tag_counter) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[0][1], x[1], x[0][0]))
基本思路是groupByKey
,然后找出每组的最大值。由于你需要最长的单词,你可以将max
方法的关键定义为单词的长度。
rdd = sc.parallelize([('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season'))])
pos_count = rdd.groupByKey()
.mapValues(lambda v: max(v, key=lambda x: len(x[1])))
print(pos_count.collect())
# [('DT', (435969, 'the')), ('VBD', (563, 'kept')), ('NN', (1884, 'washington')), ('JJ', (9300, 'first'))]
TOKEN_RE = re.compile(r"\b[\w']+\b")
def pos_tag_counter(line):
toks = nltk.regexp_tokenize(line.lower(), TOKEN_RE)
postoks = nltk.tag.pos_tag(toks)
return postoks
pos_tag_counts = text.filter(lambda line: len(line) > 0) \
.filter(lambda line: re.findall('^(?!URL).*', line)) \
.flatMap(pos_tag_counter) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[0][1], (x[1], x[0][0]))) \
.groupByKey().map(lambda x : (x[0], list(x[1])))
我有一个文本文件被减少为行,而不是单词,单词被计算并用 POS(词性)标签标记。所以我现在拥有的是一系列元组 (pos, (word, count))。 POS 是关键。我需要为每个 POS 找到最常见的词。
[('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season')),
这是我的第一个 pyspark 项目,所以我认为我没有完全掌握这个概念。我用了组
[('VBD',
[(563, 'kept'),
(56715, 'said'),
(2640, 'got'),
(12370, 's'),
(55523, 'was'),
(62, 'snapped'),
理想情况下,只要元组显示每个 POS 的最高计数单词,输出将是 - (POS, count, word) 任何顺序:
('NN', 1884, 'washington')
('DT', 435969, 'the')
等等
您不能将映射步骤更改为 map(lambda x: (x[0][1], x[1], x[0][0]))
即:
pos_tag_counts = text.filter(lambda line: len(line) > 0) \
.filter(lambda line: re.findall('^(?!URL).*', line)) \
.flatMap(pos_tag_counter) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[0][1], x[1], x[0][0]))
基本思路是groupByKey
,然后找出每组的最大值。由于你需要最长的单词,你可以将max
方法的关键定义为单词的长度。
rdd = sc.parallelize([('NN', (1884, 'washington')),
('NN', (5, 'stellar')),
('VBD', (563, 'kept')),
('DT', (435969, 'the')),
('JJ', (9300, 'first')),
('NN', (1256, 'half')),
('NN', (4028, 'season'))])
pos_count = rdd.groupByKey()
.mapValues(lambda v: max(v, key=lambda x: len(x[1])))
print(pos_count.collect())
# [('DT', (435969, 'the')), ('VBD', (563, 'kept')), ('NN', (1884, 'washington')), ('JJ', (9300, 'first'))]