Reshaping/Pivoting Spark RDD 中的数据 and/or Spark DataFrames
Reshaping/Pivoting data in Spark RDD and/or Spark DataFrames
我有一些以下格式的数据(RDD 或 Spark DataFrame):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
# convert to a Spark DataFrame
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
我想做的是 'reshape' 数据,将国家(特别是美国、英国和加拿大)中的某些行转换为列:
ID Age US UK CA
'X01' 41 3 1 2
'X02' 72 4 6 7
基本上,我需要符合 Python 的 pivot
工作流程的内容:
categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID',
columns = 'Country',
values = 'Score')
我的数据集相当大,所以我无法真正 collect()
并将数据提取到内存中以在 Python 中进行重塑。有没有办法在映射 RDD 或 Spark DataFrame 时将 Python 的 .pivot()
转换为可调用函数?如有任何帮助,我们将不胜感激!
所以首先,我必须对您的 RDD(与您的实际输出匹配)进行此更正:
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
我做了那个更正后,就成功了:
df.select($"ID", $"Age").groupBy($"ID").agg($"ID", first($"Age") as "Age")
.join(
df.select($"ID" as "usID", $"Country" as "C1",$"Score" as "US"),
$"ID" === $"usID" and $"C1" === "US"
)
.join(
df.select($"ID" as "ukID", $"Country" as "C2",$"Score" as "UK"),
$"ID" === $"ukID" and $"C2" === "UK"
)
.join(
df.select($"ID" as "caID", $"Country" as "C3",$"Score" as "CA"),
$"ID" === $"caID" and $"C3" === "CA"
)
.select($"ID",$"Age",$"US",$"UK",$"CA")
肯定没有你的枢轴那么优雅。
首先,这可能不是一个好主意,因为您没有获得任何额外信息,但您将自己与固定模式绑定(即您必须需要知道您期望有多少个国家,以及当然,额外的国家意味着代码的变化)
话说回来,这是一个SQL的问题,如下图。但是如果你觉得不太"software like"(说真的,我听说过这个!!),那么你可以参考第一个解决方案。
解决方案 1:
def reshape(t):
out = []
out.append(t[0])
out.append(t[1])
for v in brc.value:
if t[2] == v:
out.append(t[3])
else:
out.append(0)
return (out[0],out[1]),(out[2],out[3],out[4],out[5])
def cntryFilter(t):
if t[2] in brc.value:
return t
else:
pass
def addtup(t1,t2):
j=()
for k,v in enumerate(t1):
j=j+(t1[k]+t2[k],)
return j
def seq(tIntrm,tNext):
return addtup(tIntrm,tNext)
def comb(tP,tF):
return addtup(tP,tF)
countries = ['CA', 'UK', 'US', 'XX']
brc = sc.broadcast(countries)
reshaped = calls.filter(cntryFilter).map(reshape)
pivot = reshaped.aggregateByKey((0,0,0,0),seq,comb,1)
for i in pivot.collect():
print i
现在,解决方案 2:当然更好,因为 SQL 是解决此问题的正确工具
callRow = calls.map(lambda t:
Row(userid=t[0],age=int(t[1]),country=t[2],nbrCalls=t[3]))
callsDF = ssc.createDataFrame(callRow)
callsDF.printSchema()
callsDF.registerTempTable("calls")
res = ssc.sql("select userid,age,max(ca),max(uk),max(us),max(xx)\
from (select userid,age,\
case when country='CA' then nbrCalls else 0 end ca,\
case when country='UK' then nbrCalls else 0 end uk,\
case when country='US' then nbrCalls else 0 end us,\
case when country='XX' then nbrCalls else 0 end xx \
from calls) x \
group by userid,age")
res.show()
数据设置:
data=[('X01',41,'US',3),('X01',41,'UK',1),('X01',41,'CA',2),('X02',72,'US',4),('X02',72,'UK',6),('X02',72,'CA',7),('X02',72,'XX',8)]
calls = sc.parallelize(data,1)
countries = ['CA', 'UK', 'US', 'XX']
结果:
来自第一个解决方案
(('X02', 72), (7, 6, 4, 8))
(('X01', 41), (2, 1, 3, 0))
来自第二个解决方案:
root |-- age: long (nullable = true)
|-- country: string (nullable = true)
|-- nbrCalls: long (nullable = true)
|-- userid: string (nullable = true)
userid age ca uk us xx
X02 72 7 6 4 8
X01 41 2 1 3 0
请告诉我这是否有效:)
最佳
阿燕
这是一种不硬连接列名称的本机 Spark 方法。它基于 aggregateByKey
,并使用字典来收集每个键出现的列。然后我们收集所有的列名来创建最终的数据框。 [以前的版本在为每条记录发出字典后使用 jsonRDD,但这更有效。]限制到特定的列列表,或排除像 XX
这样的列将是一个简单的修改。
即使在相当大的表上,性能似乎也不错。我正在使用一种变体,它计算每个 ID 的可变数量事件中的每一个发生的次数,为每个事件类型生成一列。代码基本相同,只是它在 seqFn
中使用 collections.Counter 而不是字典来计算出现次数。
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
return u
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
df
.rdd
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c) for c in columns]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns]
)
)
result.show()
产生:
ID CA UK US XX
X02 7 6 4 8
X01 2 1 3 null
在 Hive 中有一个 JIRA for PIVOT 可以在本地执行此操作,而无需为每个值使用巨大的 CASE 语句:
https://issues.apache.org/jira/browse/HIVE-3776
请为 JIRA 投票,以便尽快实施。
一旦进入HiveSQL,Spark通常不会落后太多,最终也会在Spark中实现。
只是对 patricksurry 非常有帮助的答案的一些评论:
- 缺少年龄列,所以只需将 u["Age"] = v.Age 添加到函数 seqPivot
- 事实证明,对列元素的两次循环都以不同的顺序给出了元素。列的值是正确的,但不是它们的名称。要避免这种行为,只需对列列表进行排序。
这里是稍微修改过的代码:
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
# u is a dictionarie
# v is a Row
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
# In the original posting the Age column was not specified
u["Age"] = v.Age
return u
# u1
# u2
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
rdd
.map(lambda row: Row(ID=row[0], Age=row[1], Country=row[2], Score=row[3]))
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
columns_ord = sorted(columns)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c, None) for c in columns_ord]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns_ord]
)
)
print result.show()
最后,输出应该是
+---+---+---+---+---+----+
| ID|Age| CA| UK| US| XX|
+---+---+---+---+---+----+
|X02| 72| 7| 6| 4| 8|
|X01| 41| 2| 1| 3|null|
+---+---+---+---+---+----+
从 Spark 1.6 开始,您可以在 GroupedData
上使用 pivot
函数并提供聚合表达式。
pivoted = (df
.groupBy("ID", "Age")
.pivot(
"Country",
['US', 'UK', 'CA']) # Optional list of levels
.sum("Score")) # alternatively you can use .agg(expr))
pivoted.show()
## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41| 3| 1| 2|
## |X02| 72| 4| 6| 7|
## +---+---+---+---+---+
级别可以省略,但如果提供,既可以提高性能又可以用作内部过滤器。
这种方法仍然相对较慢,但肯定胜过在 JVM 和 Python 之间手动传递数据。
我有一些以下格式的数据(RDD 或 Spark DataFrame):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
# convert to a Spark DataFrame
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
我想做的是 'reshape' 数据,将国家(特别是美国、英国和加拿大)中的某些行转换为列:
ID Age US UK CA
'X01' 41 3 1 2
'X02' 72 4 6 7
基本上,我需要符合 Python 的 pivot
工作流程的内容:
categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID',
columns = 'Country',
values = 'Score')
我的数据集相当大,所以我无法真正 collect()
并将数据提取到内存中以在 Python 中进行重塑。有没有办法在映射 RDD 或 Spark DataFrame 时将 Python 的 .pivot()
转换为可调用函数?如有任何帮助,我们将不胜感激!
所以首先,我必须对您的 RDD(与您的实际输出匹配)进行此更正:
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
我做了那个更正后,就成功了:
df.select($"ID", $"Age").groupBy($"ID").agg($"ID", first($"Age") as "Age")
.join(
df.select($"ID" as "usID", $"Country" as "C1",$"Score" as "US"),
$"ID" === $"usID" and $"C1" === "US"
)
.join(
df.select($"ID" as "ukID", $"Country" as "C2",$"Score" as "UK"),
$"ID" === $"ukID" and $"C2" === "UK"
)
.join(
df.select($"ID" as "caID", $"Country" as "C3",$"Score" as "CA"),
$"ID" === $"caID" and $"C3" === "CA"
)
.select($"ID",$"Age",$"US",$"UK",$"CA")
肯定没有你的枢轴那么优雅。
首先,这可能不是一个好主意,因为您没有获得任何额外信息,但您将自己与固定模式绑定(即您必须需要知道您期望有多少个国家,以及当然,额外的国家意味着代码的变化)
话说回来,这是一个SQL的问题,如下图。但是如果你觉得不太"software like"(说真的,我听说过这个!!),那么你可以参考第一个解决方案。
解决方案 1:
def reshape(t):
out = []
out.append(t[0])
out.append(t[1])
for v in brc.value:
if t[2] == v:
out.append(t[3])
else:
out.append(0)
return (out[0],out[1]),(out[2],out[3],out[4],out[5])
def cntryFilter(t):
if t[2] in brc.value:
return t
else:
pass
def addtup(t1,t2):
j=()
for k,v in enumerate(t1):
j=j+(t1[k]+t2[k],)
return j
def seq(tIntrm,tNext):
return addtup(tIntrm,tNext)
def comb(tP,tF):
return addtup(tP,tF)
countries = ['CA', 'UK', 'US', 'XX']
brc = sc.broadcast(countries)
reshaped = calls.filter(cntryFilter).map(reshape)
pivot = reshaped.aggregateByKey((0,0,0,0),seq,comb,1)
for i in pivot.collect():
print i
现在,解决方案 2:当然更好,因为 SQL 是解决此问题的正确工具
callRow = calls.map(lambda t:
Row(userid=t[0],age=int(t[1]),country=t[2],nbrCalls=t[3]))
callsDF = ssc.createDataFrame(callRow)
callsDF.printSchema()
callsDF.registerTempTable("calls")
res = ssc.sql("select userid,age,max(ca),max(uk),max(us),max(xx)\
from (select userid,age,\
case when country='CA' then nbrCalls else 0 end ca,\
case when country='UK' then nbrCalls else 0 end uk,\
case when country='US' then nbrCalls else 0 end us,\
case when country='XX' then nbrCalls else 0 end xx \
from calls) x \
group by userid,age")
res.show()
数据设置:
data=[('X01',41,'US',3),('X01',41,'UK',1),('X01',41,'CA',2),('X02',72,'US',4),('X02',72,'UK',6),('X02',72,'CA',7),('X02',72,'XX',8)]
calls = sc.parallelize(data,1)
countries = ['CA', 'UK', 'US', 'XX']
结果:
来自第一个解决方案
(('X02', 72), (7, 6, 4, 8))
(('X01', 41), (2, 1, 3, 0))
来自第二个解决方案:
root |-- age: long (nullable = true)
|-- country: string (nullable = true)
|-- nbrCalls: long (nullable = true)
|-- userid: string (nullable = true)
userid age ca uk us xx
X02 72 7 6 4 8
X01 41 2 1 3 0
请告诉我这是否有效:)
最佳 阿燕
这是一种不硬连接列名称的本机 Spark 方法。它基于 aggregateByKey
,并使用字典来收集每个键出现的列。然后我们收集所有的列名来创建最终的数据框。 [以前的版本在为每条记录发出字典后使用 jsonRDD,但这更有效。]限制到特定的列列表,或排除像 XX
这样的列将是一个简单的修改。
即使在相当大的表上,性能似乎也不错。我正在使用一种变体,它计算每个 ID 的可变数量事件中的每一个发生的次数,为每个事件类型生成一列。代码基本相同,只是它在 seqFn
中使用 collections.Counter 而不是字典来计算出现次数。
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
return u
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
df
.rdd
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c) for c in columns]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns]
)
)
result.show()
产生:
ID CA UK US XX
X02 7 6 4 8
X01 2 1 3 null
在 Hive 中有一个 JIRA for PIVOT 可以在本地执行此操作,而无需为每个值使用巨大的 CASE 语句:
https://issues.apache.org/jira/browse/HIVE-3776
请为 JIRA 投票,以便尽快实施。 一旦进入HiveSQL,Spark通常不会落后太多,最终也会在Spark中实现。
只是对 patricksurry 非常有帮助的答案的一些评论:
- 缺少年龄列,所以只需将 u["Age"] = v.Age 添加到函数 seqPivot
- 事实证明,对列元素的两次循环都以不同的顺序给出了元素。列的值是正确的,但不是它们的名称。要避免这种行为,只需对列列表进行排序。
这里是稍微修改过的代码:
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
# u is a dictionarie
# v is a Row
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
# In the original posting the Age column was not specified
u["Age"] = v.Age
return u
# u1
# u2
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
rdd
.map(lambda row: Row(ID=row[0], Age=row[1], Country=row[2], Score=row[3]))
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
columns_ord = sorted(columns)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c, None) for c in columns_ord]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns_ord]
)
)
print result.show()
最后,输出应该是
+---+---+---+---+---+----+
| ID|Age| CA| UK| US| XX|
+---+---+---+---+---+----+
|X02| 72| 7| 6| 4| 8|
|X01| 41| 2| 1| 3|null|
+---+---+---+---+---+----+
从 Spark 1.6 开始,您可以在 GroupedData
上使用 pivot
函数并提供聚合表达式。
pivoted = (df
.groupBy("ID", "Age")
.pivot(
"Country",
['US', 'UK', 'CA']) # Optional list of levels
.sum("Score")) # alternatively you can use .agg(expr))
pivoted.show()
## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41| 3| 1| 2|
## |X02| 72| 4| 6| 7|
## +---+---+---+---+---+
级别可以省略,但如果提供,既可以提高性能又可以用作内部过滤器。
这种方法仍然相对较慢,但肯定胜过在 JVM 和 Python 之间手动传递数据。