pyspark dataframe 如果不存在则添加一列
pyspark dataframe add a column if it doesn't exist
我在各种 json 文件中有 json 数据并且键的行可能不同,例如
{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}
我想聚合列 'b'、'c'、'd' 和 'f' 上的数据,这些数据不存在于给定的 json 文件中,但可以存在于其他文件中。因此,由于 'f' 列不存在,我们可以为该列取空字符串。
我正在读取输入文件并像这样聚合数据
import pyspark.sql.functions as f
df = spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))
这是我想要的最终输出
{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}
有人可以帮忙吗?提前致谢!
您可以检查列在数据框中是否可用,并仅在必要时修改df
:
if not 'f' in df.columns:
df = df.withColumn('f', f.lit(''))
对于嵌套架构,您可能需要使用 df.schema
,如下所示:
>>> df.printSchema()
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
万一有人在 Scala 中需要这个:
if (!df.columns.contains("f")) {
val newDf = df.withColumn("f", lit(""))
}
这个函数结果对我来说。
def detect_data(column, df, data_type):
if not column in df.columns:
ret = lit(None).cast(data_type)
else:
ret = col(column).cast(data_type)
return ret
df = df.withColumn('f', detect_data('f', df, StringType()))
我在各种 json 文件中有 json 数据并且键的行可能不同,例如
{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}
我想聚合列 'b'、'c'、'd' 和 'f' 上的数据,这些数据不存在于给定的 json 文件中,但可以存在于其他文件中。因此,由于 'f' 列不存在,我们可以为该列取空字符串。
我正在读取输入文件并像这样聚合数据
import pyspark.sql.functions as f
df = spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))
这是我想要的最终输出
{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}
有人可以帮忙吗?提前致谢!
您可以检查列在数据框中是否可用,并仅在必要时修改df
:
if not 'f' in df.columns:
df = df.withColumn('f', f.lit(''))
对于嵌套架构,您可能需要使用 df.schema
,如下所示:
>>> df.printSchema()
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
万一有人在 Scala 中需要这个:
if (!df.columns.contains("f")) {
val newDf = df.withColumn("f", lit(""))
}
这个函数结果对我来说。
def detect_data(column, df, data_type):
if not column in df.columns:
ret = lit(None).cast(data_type)
else:
ret = col(column).cast(data_type)
return ret
df = df.withColumn('f', detect_data('f', df, StringType()))