更改 spark 数据框中列的可为空 属性
Change nullable property of column in spark dataframe
我正在为某些测试手动创建数据框。创建它的代码是:
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
因此架构如下所示:
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
我想为这些变量中的每一个创建 'nullable = true'。我如何从一开始就声明它或在创建后在新数据框中切换它?
回答
随着进口
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
你可以使用
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
直接。
您还可以通过 "pimp my library" 库模式(请参阅我的 SO post What is the best way to define custom methods on a DataFrame? )使该方法可用,这样您就可以调用
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
编辑
备选方案一
使用 setNullableStateOfColumn
的略微修改版本
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
备选方案2
明确定义模式。 (使用反射创建更通用的解决方案)
configuredUnitTest("Whosebug.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// use this to set the schema explicitly or
// use refelection on the case class member to construct the schema
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
这是一个迟到的答案,但想为来到这里的人提供一个替代解决方案。通过对代码进行以下修改,您可以从一开始就自动使 DataFrame
Column
可为空:
case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
.createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
input(Some(1111),Some(1),1001,10.00),
input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema
这将产生:
root
|-- id: long (nullable = true)
|-- var1: integer (nullable = true)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
本质上,如果您通过使用 Some([element])
或 None
作为实际输入将字段声明为 Option
,则该字段可为空。否则,该字段将不可为空。希望对您有所帮助!
在您的情况 class.
中,只需使用 java.lang.Integer 而不是 scala.Int
case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
设置所有列可为空参数的更紧凑版本
可以使用 _.copy(nullable = nullable)
而不是 case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
。那么整个函数可以写成:
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
另一种选择,如果您需要就地更改数据框,并且无法重新创建,您可以这样做:
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
Spark 会认为此列可能包含 null
,并且可空性将设置为 true
。
此外,您可以使用 udf
,将您的值包装在 Option
中。
即使对于流媒体案例也能正常工作。
谢谢Martin Senne。
只是一点补充。对于内部结构类型,您可能需要递归设置 nullable,如下所示:
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
def set(st: StructType): StructType = {
StructType(st.map {
case StructField(name, dataType, _, metadata) =>
val newDataType = dataType match {
case t: StructType => set(t)
case _ => dataType
}
StructField(name, newDataType, nullable = nullable, metadata)
})
}
df.sqlContext.createDataFrame(df.rdd, set(df.schema))
}
当您想删除一个列并在 spark 数据框中创建一个新列时,您可以创建一个可为空的列,例如。
- df.withColumn("Employee_Name", when(lit('') == '', '').否则(lit(None)))
注意:如果您想创建一个字符串类型的列并使其可为空,则以上代码有效
- df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))
注意:如果您想创建一个整数类型的列并使其可为空,则以上代码有效
我正在为某些测试手动创建数据框。创建它的代码是:
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
因此架构如下所示:
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
我想为这些变量中的每一个创建 'nullable = true'。我如何从一开始就声明它或在创建后在新数据框中切换它?
回答
随着进口
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
你可以使用
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
直接。
您还可以通过 "pimp my library" 库模式(请参阅我的 SO post What is the best way to define custom methods on a DataFrame? )使该方法可用,这样您就可以调用
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
编辑
备选方案一
使用 setNullableStateOfColumn
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
备选方案2
明确定义模式。 (使用反射创建更通用的解决方案)
configuredUnitTest("Whosebug.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// use this to set the schema explicitly or
// use refelection on the case class member to construct the schema
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
这是一个迟到的答案,但想为来到这里的人提供一个替代解决方案。通过对代码进行以下修改,您可以从一开始就自动使 DataFrame
Column
可为空:
case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
.createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
input(Some(1111),Some(1),1001,10.00),
input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema
这将产生:
root
|-- id: long (nullable = true)
|-- var1: integer (nullable = true)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
本质上,如果您通过使用 Some([element])
或 None
作为实际输入将字段声明为 Option
,则该字段可为空。否则,该字段将不可为空。希望对您有所帮助!
在您的情况 class.
中,只需使用 java.lang.Integer 而不是 scala.Intcase class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
设置所有列可为空参数的更紧凑版本
可以使用 _.copy(nullable = nullable)
而不是 case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
。那么整个函数可以写成:
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
另一种选择,如果您需要就地更改数据框,并且无法重新创建,您可以这样做:
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
Spark 会认为此列可能包含 null
,并且可空性将设置为 true
。
此外,您可以使用 udf
,将您的值包装在 Option
中。
即使对于流媒体案例也能正常工作。
谢谢Martin Senne。 只是一点补充。对于内部结构类型,您可能需要递归设置 nullable,如下所示:
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
def set(st: StructType): StructType = {
StructType(st.map {
case StructField(name, dataType, _, metadata) =>
val newDataType = dataType match {
case t: StructType => set(t)
case _ => dataType
}
StructField(name, newDataType, nullable = nullable, metadata)
})
}
df.sqlContext.createDataFrame(df.rdd, set(df.schema))
}
当您想删除一个列并在 spark 数据框中创建一个新列时,您可以创建一个可为空的列,例如。
- df.withColumn("Employee_Name", when(lit('') == '', '').否则(lit(None)))
注意:如果您想创建一个字符串类型的列并使其可为空,则以上代码有效
- df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))
注意:如果您想创建一个整数类型的列并使其可为空,则以上代码有效