过滤记录以检查特定列是否存在 java.lang.NullPointerException
Filtering records to check if a certain column exists giving java.lang.NullPointerException
所以我有一个这种格式的记录数据框-
{
"table": "SYSMAN.EM_METRIC_COLUMN_VER_E",
"op_type": "I",
"op_ts": "2021-03-24 13:15:31.396105",
"pos": "00000000000000000000",
"after": {
"METRIC_GROUP_ID": 4700,
"METRIC_COLUMN_ID": 293339,
"METRIC_GROUP_VERSION_ID": 41670
}
}
我想根据某个列的存在来过滤这些记录。如果它在“after”结构中有该列(如 METRIC_GROUP_ID、METRIC_COLUMN_ID、METRIC_GROUP_VERSION_ID),我想将它添加到列表中。
这是我写的代码-
def HasColumn(row: Row, Column:String) =
Try(row.getAs[Row]("before").getAs[Any](Column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](Column)).isSuccess
var records_list: List[Row] = null
for(row<-inputDS){if(HasColumn(row,Column_String)){records_list:+row}}
我从最后一行得到以下异常-
21/06/02 21:54:02 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 12)
java.lang.NullPointerException
我知道您无法从传递给 Spark DataFrame/RDD 转换之一的函数中访问 Spark 的任何“驱动程序端”抽象(RDD、DataFrames、Dataset、SparkSession...),因为它们仅存在于您的驱动程序应用程序中。所以我尽量避免它,但我没有得到任何解决方案。
试试下面的代码。
创建 UDF
scala> def hasColumn = udf((row:Row,column:String) => Try(row.getAs[Row]("before").getAs[Any](column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](column)).isSuccess)
使用 UDF 检查列是否可用。
scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after |op_ts |op_type|pos |table |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
并在新列上添加过滤条件。
scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).filter($"has" === true).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after |op_ts |op_type|pos |table |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
scala> df.withColumn("has",hasColumn(struct($"*"),lit("Column_does_not_exist"))).filter($"has" === true).show(false)
+-----+-----+-------+---+-----+---+
|after|op_ts|op_type|pos|table|has|
+-----+-----+-------+---+-----+---+
+-----+-----+-------+---+-----+---+
我不知道你是否必须将其作为 RDD 或数据集来执行,如果是 RDD,解决方案看起来像这样作为一个函数
def filterData(data: RDD[Row], column: String): RDD[Row] = {
data.filter { r =>
Try(r.getAs[Row]("before").getAs[Any](column))
.orElse(Try(r.getAs[Row]("after").getAs[Any](column)))
.isSuccess
}
}
如果你想减少那里的代码量,我们可以做到
def filterData(data: RDD[Row], column: String): RDD[Row] = {
data.filter { r =>
Seq("before", "after").map(c => Try(r.getAs[Row](c)).map(_.getAs[Any](column)))
.reduce(_ orElse _).isSuccess
}
}
这样做的好处是,如果您想添加更多要搜索的地方,而不仅仅是之前和之后,您只需将其添加到 Seq
对于数据集,您只需要检查列是否存在且是否为非空
df.where(col(column).isNotNull)
实际上,两者都假设你有一个固定的模式(即使是推断的),所以数据集要简单得多。
所以我有一个这种格式的记录数据框-
{
"table": "SYSMAN.EM_METRIC_COLUMN_VER_E",
"op_type": "I",
"op_ts": "2021-03-24 13:15:31.396105",
"pos": "00000000000000000000",
"after": {
"METRIC_GROUP_ID": 4700,
"METRIC_COLUMN_ID": 293339,
"METRIC_GROUP_VERSION_ID": 41670
}
}
我想根据某个列的存在来过滤这些记录。如果它在“after”结构中有该列(如 METRIC_GROUP_ID、METRIC_COLUMN_ID、METRIC_GROUP_VERSION_ID),我想将它添加到列表中。
这是我写的代码-
def HasColumn(row: Row, Column:String) =
Try(row.getAs[Row]("before").getAs[Any](Column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](Column)).isSuccess
var records_list: List[Row] = null
for(row<-inputDS){if(HasColumn(row,Column_String)){records_list:+row}}
我从最后一行得到以下异常-
21/06/02 21:54:02 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 12)
java.lang.NullPointerException
我知道您无法从传递给 Spark DataFrame/RDD 转换之一的函数中访问 Spark 的任何“驱动程序端”抽象(RDD、DataFrames、Dataset、SparkSession...),因为它们仅存在于您的驱动程序应用程序中。所以我尽量避免它,但我没有得到任何解决方案。
试试下面的代码。
创建 UDF
scala> def hasColumn = udf((row:Row,column:String) => Try(row.getAs[Row]("before").getAs[Any](column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](column)).isSuccess)
使用 UDF 检查列是否可用。
scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after |op_ts |op_type|pos |table |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
并在新列上添加过滤条件。
scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).filter($"has" === true).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after |op_ts |op_type|pos |table |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
scala> df.withColumn("has",hasColumn(struct($"*"),lit("Column_does_not_exist"))).filter($"has" === true).show(false)
+-----+-----+-------+---+-----+---+
|after|op_ts|op_type|pos|table|has|
+-----+-----+-------+---+-----+---+
+-----+-----+-------+---+-----+---+
我不知道你是否必须将其作为 RDD 或数据集来执行,如果是 RDD,解决方案看起来像这样作为一个函数
def filterData(data: RDD[Row], column: String): RDD[Row] = {
data.filter { r =>
Try(r.getAs[Row]("before").getAs[Any](column))
.orElse(Try(r.getAs[Row]("after").getAs[Any](column)))
.isSuccess
}
}
如果你想减少那里的代码量,我们可以做到
def filterData(data: RDD[Row], column: String): RDD[Row] = {
data.filter { r =>
Seq("before", "after").map(c => Try(r.getAs[Row](c)).map(_.getAs[Any](column)))
.reduce(_ orElse _).isSuccess
}
}
这样做的好处是,如果您想添加更多要搜索的地方,而不仅仅是之前和之后,您只需将其添加到 Seq
对于数据集,您只需要检查列是否存在且是否为非空
df.where(col(column).isNotNull)
实际上,两者都假设你有一个固定的模式(即使是推断的),所以数据集要简单得多。