Pyspark:RDD 和 "WHERE" 操作
Pyspark : RDD and "WHERE" operation
我正在学习如何使用 Python 处理 Spark RDD
,但我没有根据 rdd.filter()
和 where
条件找到解决方案。
我有一个如下所示的 CSV 文件:
id,firstname,city,age,job,salary,childen,awards
1, Yves, OLS-ET-RINHODES, 55, Pilote de chasse, 3395, 3, 3
2, Paul, MARTOT, 32, Pilote d'helicoptere, 2222, 4, 5
3, Steve, DIEULEFIT, 53, Navigateur aerien, 2152, 3, 2
4, Valentin, FEUILLADE, 27, Pilote de chasse, 1776, 0, 2
...
这是我的 python 脚本:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
#Context properties
conf = SparkConf().setAppName("Aeroport")
sc = SparkContext(conf=conf)
#Data Reading
data = sc.textFile("hdfs://master:9000/testfile.csv")
#Split each column
dataset = data.map(lambda l: l.split(','))
#Search children number by city
nbChildByCity = dataset.map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance : " + str(nbChildByCity.collect())
#Search children number by city with father > 50 years old
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
#nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
print "Nombre enfant par ville naissance avec père > 50 ans : " + str(nbChildByCityFather.collect())
我的问题是:#Search children number by city with father > 50 years old
我不克服添加最后一个条件:father > 50 years old
。我如何将 where
条件写入 RDD?
我试过了:
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
但是none结果..
使用数据框 API 更容易、更有效地实施(请参阅底部的替代方法)。
要获取行中年龄超过 50 岁的条目数,您首先需要进行过滤。您还需要在 reduce
调用中使用年龄列(索引 6):
城市儿童人数:
nbChildByCity = data.map(lambda row : (row[2], int(row[6].strip())))
#note that it's using child count, not 1
nbChildByCity.collect()
输出:
[(' OLS-ET-RINHODES', 3), (' MARTOT', 4), (' DIEULEFIT', 3), (' FEUILLADE', 0)]
相同,但 wi:
nbChildByCity50 = rdd.filter(lambda l: int(l[3]) > 50 )\
.map(lambda row : (row[2], int(row[6].strip()) ))\
.reduceByKey(lambda a,b:a+b)
print("Nombre enfant par ville naissance :" + str(nbChildByCity50.collect()))
输出:
Nombre enfant par ville naissance :[(' OLS-ET-RINHODES', 3), (' DIEULEFIT', 3)]
请注意,使用数据框 API:
更容易也更合适
df = spark.read.csv('cities.csv', header=True, inferSchema=True)
grp = df.groupBy(['city'])
grp.sum('childen').show()
给出:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| FEUILLADE| 0.0|
| MARTOT| 4.0|
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
并按年龄筛选:
grp = df.where('age > 50').groupBy(['city'])
grp.sum('childen').show()
输出:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
在应用 reduceByKey
之前,您应该先 filter
nbChildByCityFather = dataset.filter(lambda row : int(row[3].strip()) > 50).map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance avec pere > 50 ans : " + str(nbChildByCityFather.collect())
注意:此方法仅在您从 csv 文件中删除 header 行或以某种方式对其进行过滤时才有效。
我正在学习如何使用 Python 处理 Spark RDD
,但我没有根据 rdd.filter()
和 where
条件找到解决方案。
我有一个如下所示的 CSV 文件:
id,firstname,city,age,job,salary,childen,awards
1, Yves, OLS-ET-RINHODES, 55, Pilote de chasse, 3395, 3, 3
2, Paul, MARTOT, 32, Pilote d'helicoptere, 2222, 4, 5
3, Steve, DIEULEFIT, 53, Navigateur aerien, 2152, 3, 2
4, Valentin, FEUILLADE, 27, Pilote de chasse, 1776, 0, 2
...
这是我的 python 脚本:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
#Context properties
conf = SparkConf().setAppName("Aeroport")
sc = SparkContext(conf=conf)
#Data Reading
data = sc.textFile("hdfs://master:9000/testfile.csv")
#Split each column
dataset = data.map(lambda l: l.split(','))
#Search children number by city
nbChildByCity = dataset.map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance : " + str(nbChildByCity.collect())
#Search children number by city with father > 50 years old
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
#nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
print "Nombre enfant par ville naissance avec père > 50 ans : " + str(nbChildByCityFather.collect())
我的问题是:#Search children number by city with father > 50 years old
我不克服添加最后一个条件:father > 50 years old
。我如何将 where
条件写入 RDD?
我试过了:
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
但是none结果..
使用数据框 API 更容易、更有效地实施(请参阅底部的替代方法)。
要获取行中年龄超过 50 岁的条目数,您首先需要进行过滤。您还需要在 reduce
调用中使用年龄列(索引 6):
城市儿童人数:
nbChildByCity = data.map(lambda row : (row[2], int(row[6].strip())))
#note that it's using child count, not 1
nbChildByCity.collect()
输出:
[(' OLS-ET-RINHODES', 3), (' MARTOT', 4), (' DIEULEFIT', 3), (' FEUILLADE', 0)]
相同,但 wi:
nbChildByCity50 = rdd.filter(lambda l: int(l[3]) > 50 )\
.map(lambda row : (row[2], int(row[6].strip()) ))\
.reduceByKey(lambda a,b:a+b)
print("Nombre enfant par ville naissance :" + str(nbChildByCity50.collect()))
输出:
Nombre enfant par ville naissance :[(' OLS-ET-RINHODES', 3), (' DIEULEFIT', 3)]
请注意,使用数据框 API:
df = spark.read.csv('cities.csv', header=True, inferSchema=True)
grp = df.groupBy(['city'])
grp.sum('childen').show()
给出:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| FEUILLADE| 0.0|
| MARTOT| 4.0|
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
并按年龄筛选:
grp = df.where('age > 50').groupBy(['city'])
grp.sum('childen').show()
输出:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
在应用 reduceByKey
filter
nbChildByCityFather = dataset.filter(lambda row : int(row[3].strip()) > 50).map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance avec pere > 50 ans : " + str(nbChildByCityFather.collect())
注意:此方法仅在您从 csv 文件中删除 header 行或以某种方式对其进行过滤时才有效。