基于pyspark中的键有效地推断数据帧模式
Efficiently inferring dataframe schema based on key in pyspark
我有一个很大的行(行)数据集 json。这些行有多个字段,并且存在的字段取决于该行中的 json 字段之一。这是一个小例子:
%pyspark
data = sc.parallelize([{'key':'k1','a':1.0,'b':2.0},
{'key':'k1','a':1.0,'b':20.0},
{'key':'k1','a':100.0,'b':.2},
{'key':'k2','y':10.0,'z':20.0},
{'key':'k2','y':1.0,'z':250.0},
{'key':'k1','a':1.0,'b':2.0},], 2)
我的目标是将这些数据放入 Dataframe 中,而无需指定架构。 Pyspark 有(至少)两个函数来帮助解决这个问题:1) toDF()
,它只将第一行数据作为模式和 2) sqlContext.createDataFrame()
,您可以在其中指定要采样的行的比例为了推断模式。例如:
data.toDF().show()
+-----+----+---+
| a| b|key|
+-----+----+---+
| 1.0| 2.0| k1|
| 1.0|20.0| k1|
|100.0| 0.2| k1|
| null|null| k2|
| null|null| k2|
| 1.0| 2.0| k1|
+-----+----+---+
sqlContext.createDataFrame(data,samplingRatio=1).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
| 1.0| 2.0| k1|null| null|
+-----+----+---+----+-----+
sqlContext.createDataFrame()
做我想做的事,但由于我在 40 亿行中可能只有五个键,所以我认为必须有一种更快的方法来推断模式。此外,有些密钥非常稀有,所以我不能逃避让 samplingRatio
变小。
如果只有几种行类型,是否有一种优雅而快速的方法来推断架构?
更多的谷歌搜索让我找到了解决方案。
首先创建一个强大的数据帧连接器(unionAll
无法合并架构):
def addEmptyColumns(df, colNames):
exprs = df.columns + ["null as " + colName for colName in colNames]
return df.selectExpr(*exprs)
def concatTwoDfs(left, right):
# append columns from right df to left df
missingColumnsLeft = set(right.columns) - set(left.columns)
left = addEmptyColumns(left, missingColumnsLeft)
# append columns from left df to right df
missingColumnsRight = set(left.columns) - set(right.columns)
right = addEmptyColumns(right, missingColumnsRight)
# let's set the same order of columns
right = right[left.columns]
# finally, union them
return left.unionAll(right)
def concat(dfs):
return reduce(concatTwoDfs, dfs)
(代码来自 https://lab.getbase.com/pandarize-spark-dataframes/)
然后获取不同的键,制作数据帧列表,并将它们连接起来:
keys = data.map(lambda x: x['key']).distinct().collect()
a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys]
concat(a_grp).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| 1.0| 2.0| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
+-----+----+---+----+-----+
我有一个很大的行(行)数据集 json。这些行有多个字段,并且存在的字段取决于该行中的 json 字段之一。这是一个小例子:
%pyspark
data = sc.parallelize([{'key':'k1','a':1.0,'b':2.0},
{'key':'k1','a':1.0,'b':20.0},
{'key':'k1','a':100.0,'b':.2},
{'key':'k2','y':10.0,'z':20.0},
{'key':'k2','y':1.0,'z':250.0},
{'key':'k1','a':1.0,'b':2.0},], 2)
我的目标是将这些数据放入 Dataframe 中,而无需指定架构。 Pyspark 有(至少)两个函数来帮助解决这个问题:1) toDF()
,它只将第一行数据作为模式和 2) sqlContext.createDataFrame()
,您可以在其中指定要采样的行的比例为了推断模式。例如:
data.toDF().show()
+-----+----+---+
| a| b|key|
+-----+----+---+
| 1.0| 2.0| k1|
| 1.0|20.0| k1|
|100.0| 0.2| k1|
| null|null| k2|
| null|null| k2|
| 1.0| 2.0| k1|
+-----+----+---+
sqlContext.createDataFrame(data,samplingRatio=1).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
| 1.0| 2.0| k1|null| null|
+-----+----+---+----+-----+
sqlContext.createDataFrame()
做我想做的事,但由于我在 40 亿行中可能只有五个键,所以我认为必须有一种更快的方法来推断模式。此外,有些密钥非常稀有,所以我不能逃避让 samplingRatio
变小。
如果只有几种行类型,是否有一种优雅而快速的方法来推断架构?
更多的谷歌搜索让我找到了解决方案。
首先创建一个强大的数据帧连接器(unionAll
无法合并架构):
def addEmptyColumns(df, colNames):
exprs = df.columns + ["null as " + colName for colName in colNames]
return df.selectExpr(*exprs)
def concatTwoDfs(left, right):
# append columns from right df to left df
missingColumnsLeft = set(right.columns) - set(left.columns)
left = addEmptyColumns(left, missingColumnsLeft)
# append columns from left df to right df
missingColumnsRight = set(left.columns) - set(right.columns)
right = addEmptyColumns(right, missingColumnsRight)
# let's set the same order of columns
right = right[left.columns]
# finally, union them
return left.unionAll(right)
def concat(dfs):
return reduce(concatTwoDfs, dfs)
(代码来自 https://lab.getbase.com/pandarize-spark-dataframes/)
然后获取不同的键,制作数据帧列表,并将它们连接起来:
keys = data.map(lambda x: x['key']).distinct().collect()
a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys]
concat(a_grp).show()
+-----+----+---+----+-----+
| a| b|key| y| z|
+-----+----+---+----+-----+
| 1.0| 2.0| k1|null| null|
| 1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| 1.0| 2.0| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
+-----+----+---+----+-----+