Pyspark:根据两个RDD中两列的条件计算两个对应列的总和
Pyspark: Calculating sum of two correspoding columns, based on conditions of two columns in two RDDs
我有两个具有相同列的 RDD:
rdd1:-
+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1| 1|
| m1| u2| 1|
| m2| u1| 2|
+-----------------+
rdd2:-
+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1| 10|
| m2| u1| 98|
| m3| u2| 21|
+-----------------+
我想根据 mid
和 uid
计算 frequencies
的总和。结果应该是这样的:
+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1| 11|
| m2| u1| 100|
| m3| u2| 21|
+-----------------+
提前致谢。
编辑:
我也是通过这种方式实现的解决方案(使用 map-reduce):
from pyspark.sql.functions import col
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
.reduceByKey(lambda a, b: a+b)
p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()
p = p.select(col("_1").alias("mid"), \
col("_2").alias("uid"), \
col("_3").alias("frequency"))
p.show()
输出:
+---+---+---------+
|mid|uid|frequency|
+---+---+---------+
| m2| u1| 100|
| m1| u1| 11|
| m1| u2| 1|
| m3| u2| 21|
+---+---+---------+
只需要按mid和uid进行分组,并进行求和即可:
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.groupBy(df3.mid,df3.uid).sum() \
.withColumnRenamed("sum(frequency)","frequency")
df4.show()
# +---+---+---------+
# |mid|uid|frequency|
# +---+---+---------+
# | m1| u1| 11|
# | m1| u2| 1|
# | m2| u1| 100|
# | m3| u2| 21|
# +---+---+---------+
我也是这样解决的(使用map-reduce):
from pyspark.sql.functions import col
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
.reduceByKey(lambda a, b: a+b)
p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()
p = p.select(col("_1").alias("mid"), \
col("_2").alias("uid"), \
col("_3").alias("frequency"))
p.show()
输出:
+---+---+---------+
|mid|uid|frequency|
+---+---+---------+
| m2| u1| 100|
| m1| u1| 11|
| m1| u2| 1|
| m3| u2| 21|
+---+---+---------+
我有两个具有相同列的 RDD:
rdd1:-
+-----------------+ |mid|uid|frequency| +-----------------+ | m1| u1| 1| | m1| u2| 1| | m2| u1| 2| +-----------------+
rdd2:-
+-----------------+ |mid|uid|frequency| +-----------------+ | m1| u1| 10| | m2| u1| 98| | m3| u2| 21| +-----------------+
我想根据 mid
和 uid
计算 frequencies
的总和。结果应该是这样的:
+-----------------+ |mid|uid|frequency| +-----------------+ | m1| u1| 11| | m2| u1| 100| | m3| u2| 21| +-----------------+
提前致谢。
编辑: 我也是通过这种方式实现的解决方案(使用 map-reduce):
from pyspark.sql.functions import col
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
.reduceByKey(lambda a, b: a+b)
p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()
p = p.select(col("_1").alias("mid"), \
col("_2").alias("uid"), \
col("_3").alias("frequency"))
p.show()
输出:
+---+---+---------+ |mid|uid|frequency| +---+---+---------+ | m2| u1| 100| | m1| u1| 11| | m1| u2| 1| | m3| u2| 21| +---+---+---------+
只需要按mid和uid进行分组,并进行求和即可:
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.groupBy(df3.mid,df3.uid).sum() \
.withColumnRenamed("sum(frequency)","frequency")
df4.show()
# +---+---+---------+
# |mid|uid|frequency|
# +---+---+---------+
# | m1| u1| 11|
# | m1| u2| 1|
# | m2| u1| 100|
# | m3| u2| 21|
# +---+---+---------+
我也是这样解决的(使用map-reduce):
from pyspark.sql.functions import col
data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])
df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
.reduceByKey(lambda a, b: a+b)
p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()
p = p.select(col("_1").alias("mid"), \
col("_2").alias("uid"), \
col("_3").alias("frequency"))
p.show()
输出:
+---+---+---------+ |mid|uid|frequency| +---+---+---------+ | m2| u1| 100| | m1| u1| 11| | m1| u2| 1| | m3| u2| 21| +---+---+---------+