使用 spark 将 12 个大数据帧合并在一起

Using spark to merge 12 large dataframes together

假设我有一个看起来像这样的数据框

d1 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[1,2,3],
'B':[4,5,6],
'C':[7,8,9],
}
df1 = pd.DataFrame(data=d1)

我还有另一个类似这样的数据框

d2 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2010','2010','2010'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[11,22,33],
'B':[44,55,66],
'C':[77,88,99],
'D': [10,20,30]
}
df2 = pd.DataFrame(data=d2)

如果我想将这两个数据框合并为一个数据框,但如果列相同则将它们堆叠起来,否则创建一个新列给我这个关联的数据框

d3 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500', 'Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009','2010','2010','2010'],
'state':['Alabama','Alabama','Alabama','Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3,3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County', 'Jefferson County','Jefferson County','Jefferson County']
'A':[1,2,3,11,22,33],
'B':[4,5,6,44,55,66],
'C':[7,8,9,77,88,99],
'D':[None,None,None,10,20,30]
}

df_final = pd.DataFrame(data=d3)

那么我需要使用 pandas 做的就是这个

df = pd.concat([df1, df2]).reset_index(drop=True)

现在的问题是,当我启动一个非常大的 ec2 实例时,运行 需要几天时间,而我拥有的 ec2 实例每小时收费 10。我有 AWS 开发人员支持,并被推荐“使用 EMR 集群而不是 Glue 服务,这让您可以更好地控制 Spark 集群的大小和配置。”

我使用 EMR 的经验为零,所以我想知道执行我使用 pandas.

描述的任务的等效表达式是什么

具体来说,df = pd.concat([df1, df2]).reset_index(drop=True)在使用Spark的EMR中的等效表达式是什么?

它会像这样的“union()”还是unionAll()”?如果可能的话,我想知道具有相同数据帧的具体表达式以供我理解。

您可以使用unionByName

# Your own initialization here
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)

# Use unionByName
df_final = df1.unionByName(df2, allowMissingColumns=True)

输出:

>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
|       Location+Type|Year|  state|Census_tract|     County_name|  A|  B|  C|   D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County|  1|  4|  7|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County|  2|  5|  8|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County|  3|  6|  9|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County| 11| 44| 77|  10|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County| 22| 55| 88|  20|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County| 33| 66| 99|  30|
+--------------------+----+-------+------------+----------------+---+---+---+----+

更新

Assuming there are 12 dataframes, how would I adjust the code?

from pyspark.sql import DataFrame
from functools import reduce, partial

df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
df3 = spark.createDataFrame(df3)
df4 = spark.createDataFrame(df4)

dfs = [df1, df2, df3, df4]
unionByName = partial(DataFrame.unionByName, allowMissingColumns=True)
df_final = reduce(unionByName, dfs)

输出:

>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
|       Location+Type|Year|  state|Census_tract|     County_name|  A|  B|  C|   D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County|  1|  4|  7|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County|  2|  5|  8|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County|  3|  6|  9|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County| 11| 44| 77|  10|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County| 22| 55| 88|  20|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County| 33| 66| 99|  30|
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County| 21| 24| 27|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County| 22| 25| 28|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County| 23| 26| 29|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County|111|144|177| 110|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County|122|155|188| 120|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County|133|166|199| 130|
+--------------------+----+-------+------------+----------------+---+---+---+----+