使用 spark 将 12 个大数据帧合并在一起
Using spark to merge 12 large dataframes together
假设我有一个看起来像这样的数据框
d1 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[1,2,3],
'B':[4,5,6],
'C':[7,8,9],
}
df1 = pd.DataFrame(data=d1)
我还有另一个类似这样的数据框
d2 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2010','2010','2010'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[11,22,33],
'B':[44,55,66],
'C':[77,88,99],
'D': [10,20,30]
}
df2 = pd.DataFrame(data=d2)
如果我想将这两个数据框合并为一个数据框,但如果列相同则将它们堆叠起来,否则创建一个新列给我这个关联的数据框
d3 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500', 'Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009','2010','2010','2010'],
'state':['Alabama','Alabama','Alabama','Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3,3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County', 'Jefferson County','Jefferson County','Jefferson County']
'A':[1,2,3,11,22,33],
'B':[4,5,6,44,55,66],
'C':[7,8,9,77,88,99],
'D':[None,None,None,10,20,30]
}
df_final = pd.DataFrame(data=d3)
那么我需要使用 pandas 做的就是这个
df = pd.concat([df1, df2]).reset_index(drop=True)
现在的问题是,当我启动一个非常大的 ec2 实例时,运行 需要几天时间,而我拥有的 ec2 实例每小时收费 10。我有 AWS 开发人员支持,并被推荐“使用 EMR 集群而不是 Glue 服务,这让您可以更好地控制 Spark 集群的大小和配置。”
我使用 EMR 的经验为零,所以我想知道执行我使用 pandas.
描述的任务的等效表达式是什么
具体来说,df = pd.concat([df1, df2]).reset_index(drop=True)
在使用Spark的EMR中的等效表达式是什么?
它会像这样的“union()”还是unionAll()”?如果可能的话,我想知道具有相同数据帧的具体表达式以供我理解。
您可以使用unionByName
# Your own initialization here
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
# Use unionByName
df_final = df1.unionByName(df2, allowMissingColumns=True)
输出:
>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
| Location+Type|Year| state|Census_tract| County_name| A| B| C| D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 1| 4| 7|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 2| 5| 8|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 3| 6| 9|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County| 11| 44| 77| 10|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County| 22| 55| 88| 20|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County| 33| 66| 99| 30|
+--------------------+----+-------+------------+----------------+---+---+---+----+
更新
Assuming there are 12 dataframes, how would I adjust the code?
from pyspark.sql import DataFrame
from functools import reduce, partial
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
df3 = spark.createDataFrame(df3)
df4 = spark.createDataFrame(df4)
dfs = [df1, df2, df3, df4]
unionByName = partial(DataFrame.unionByName, allowMissingColumns=True)
df_final = reduce(unionByName, dfs)
输出:
>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
| Location+Type|Year| state|Census_tract| County_name| A| B| C| D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 1| 4| 7|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 2| 5| 8|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 3| 6| 9|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County| 11| 44| 77| 10|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County| 22| 55| 88| 20|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County| 33| 66| 99| 30|
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 21| 24| 27|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 22| 25| 28|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 23| 26| 29|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County|111|144|177| 110|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County|122|155|188| 120|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County|133|166|199| 130|
+--------------------+----+-------+------------+----------------+---+---+---+----+
假设我有一个看起来像这样的数据框
d1 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[1,2,3],
'B':[4,5,6],
'C':[7,8,9],
}
df1 = pd.DataFrame(data=d1)
我还有另一个类似这样的数据框
d2 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2010','2010','2010'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[11,22,33],
'B':[44,55,66],
'C':[77,88,99],
'D': [10,20,30]
}
df2 = pd.DataFrame(data=d2)
如果我想将这两个数据框合并为一个数据框,但如果列相同则将它们堆叠起来,否则创建一个新列给我这个关联的数据框
d3 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500', 'Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009','2010','2010','2010'],
'state':['Alabama','Alabama','Alabama','Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3,3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County', 'Jefferson County','Jefferson County','Jefferson County']
'A':[1,2,3,11,22,33],
'B':[4,5,6,44,55,66],
'C':[7,8,9,77,88,99],
'D':[None,None,None,10,20,30]
}
df_final = pd.DataFrame(data=d3)
那么我需要使用 pandas 做的就是这个
df = pd.concat([df1, df2]).reset_index(drop=True)
现在的问题是,当我启动一个非常大的 ec2 实例时,运行 需要几天时间,而我拥有的 ec2 实例每小时收费 10。我有 AWS 开发人员支持,并被推荐“使用 EMR 集群而不是 Glue 服务,这让您可以更好地控制 Spark 集群的大小和配置。”
我使用 EMR 的经验为零,所以我想知道执行我使用 pandas.
描述的任务的等效表达式是什么具体来说,df = pd.concat([df1, df2]).reset_index(drop=True)
在使用Spark的EMR中的等效表达式是什么?
它会像这样的“union()”还是unionAll()”?如果可能的话,我想知道具有相同数据帧的具体表达式以供我理解。
您可以使用unionByName
# Your own initialization here
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
# Use unionByName
df_final = df1.unionByName(df2, allowMissingColumns=True)
输出:
>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
| Location+Type|Year| state|Census_tract| County_name| A| B| C| D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 1| 4| 7|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 2| 5| 8|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 3| 6| 9|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County| 11| 44| 77| 10|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County| 22| 55| 88| 20|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County| 33| 66| 99| 30|
+--------------------+----+-------+------------+----------------+---+---+---+----+
更新
Assuming there are 12 dataframes, how would I adjust the code?
from pyspark.sql import DataFrame
from functools import reduce, partial
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
df3 = spark.createDataFrame(df3)
df4 = spark.createDataFrame(df4)
dfs = [df1, df2, df3, df4]
unionByName = partial(DataFrame.unionByName, allowMissingColumns=True)
df_final = reduce(unionByName, dfs)
输出:
>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
| Location+Type|Year| state|Census_tract| County_name| A| B| C| D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 1| 4| 7|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 2| 5| 8|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 3| 6| 9|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County| 11| 44| 77| 10|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County| 22| 55| 88| 20|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County| 33| 66| 99| 30|
|Census Tract 3, J...|2009|Alabama| 3|Jefferson County| 21| 24| 27|null|
|Census Tract 4, J...|2009|Alabama| 3|Jefferson County| 22| 25| 28|null|
|Census Tract 5, J...|2009|Alabama| 3|Jefferson County| 23| 26| 29|null|
|Census Tract 3, J...|2010|Alabama| 3|Jefferson County|111|144|177| 110|
|Census Tract 4, J...|2010|Alabama| 3|Jefferson County|122|155|188| 120|
|Census Tract 5, J...|2010|Alabama| 3|Jefferson County|133|166|199| 130|
+--------------------+----+-------+------------+----------------+---+---+---+----+