Pyspark 频率计数
Pyspark frequency counts
我正在学习 PySpark map 和 reduce。我想要做的是使用 mapreduce 将 rdd 转换为基于每个 t 列 u 出现次数的频率计数。例如:
输入:
rdd = [u"(u't1', u'u1', 0.8)",
u"(u't1', u'u2', 0.1)",
u"(u't1', u'u3', 0.3)",
u"(u't1', u'u4', 0.4)",
u"(u't2', u'u1', 0.8)",
u"(u't2', u'u2', 0.3)"]
输出:
output= u"(u't1', u' u1', 0.8, 4)",
u"(u't1', u' u2', 0.1, 4)",
u"(u't1', u' u3', 0.3, 4)",
u"(u't1', u' u4', 0.4, 4)",
u"(u't2', u' u1', 0.8, 2)",
u"(u't2', u' u2', 0.3, 2)"]
我试过
rdd.map(lambda row: ((row[0], (row[1], row[2])), 1)).\
reduceByKey(lambda (a1,b1,c1),(a2,b2,c2): (a1+a2,b1+b2,c1+c2))
但 working.error 消息太多行无法解压。有什么建议么?谢谢
一种方法可能是
>>> rdd = sc.parallelize([('t1', 'u1', 0.8),
... ('t1', 'u2', 0.1),
... ('t1', 'u3', 0.3),
... ('t1', 'u4', 0.4),
... ('t2', 'u1', 0.8),
... ('t2', 'u2', 0.3)])
>>> rdd1 = rdd.map(lambda r: (r[0],(r[1],r[2])))
>>> rdd2 = sc.parallelize(rdd.map(lambda r: (r[0],(r[1],r[2]))).countByKey().items())
>>> rdd1.join(rdd2).map(lambda (a,((b,c),d)): (a,b,c,d)).collect()
[('t2', 'u1', 0.8, 2), ('t2', 'u2', 0.3, 2), ('t1', 'u1', 0.8, 4), ('t1', 'u2', 0.1, 4), ('t1', 'u3', 0.3, 4), ('t1', 'u4', 0.4, 4)]
我正在学习 PySpark map 和 reduce。我想要做的是使用 mapreduce 将 rdd 转换为基于每个 t 列 u 出现次数的频率计数。例如:
输入:
rdd = [u"(u't1', u'u1', 0.8)",
u"(u't1', u'u2', 0.1)",
u"(u't1', u'u3', 0.3)",
u"(u't1', u'u4', 0.4)",
u"(u't2', u'u1', 0.8)",
u"(u't2', u'u2', 0.3)"]
输出:
output= u"(u't1', u' u1', 0.8, 4)",
u"(u't1', u' u2', 0.1, 4)",
u"(u't1', u' u3', 0.3, 4)",
u"(u't1', u' u4', 0.4, 4)",
u"(u't2', u' u1', 0.8, 2)",
u"(u't2', u' u2', 0.3, 2)"]
我试过
rdd.map(lambda row: ((row[0], (row[1], row[2])), 1)).\
reduceByKey(lambda (a1,b1,c1),(a2,b2,c2): (a1+a2,b1+b2,c1+c2))
但 working.error 消息太多行无法解压。有什么建议么?谢谢
一种方法可能是
>>> rdd = sc.parallelize([('t1', 'u1', 0.8),
... ('t1', 'u2', 0.1),
... ('t1', 'u3', 0.3),
... ('t1', 'u4', 0.4),
... ('t2', 'u1', 0.8),
... ('t2', 'u2', 0.3)])
>>> rdd1 = rdd.map(lambda r: (r[0],(r[1],r[2])))
>>> rdd2 = sc.parallelize(rdd.map(lambda r: (r[0],(r[1],r[2]))).countByKey().items())
>>> rdd1.join(rdd2).map(lambda (a,((b,c),d)): (a,b,c,d)).collect()
[('t2', 'u1', 0.8, 2), ('t2', 'u2', 0.3, 2), ('t1', 'u1', 0.8, 4), ('t1', 'u2', 0.1, 4), ('t1', 'u3', 0.3, 4), ('t1', 'u4', 0.4, 4)]