Spark:计算向量列中的 NaN
Spark: Counting NaNs in a vector column
假设我有一个如下所示的 Spark DataFrame
+-----------+------------------+
| id | features|
+-----------+------------------+
| 1|[57.0,1.0,0.0,0.0]|
| 2|[63.0,NaN,0.0,0.0]|
| 3|[74.0,1.0,3.0,NaN]|
| 4|[67.0,NaN,0.0,0.0]|
| 5|[NaN,1.0,NaN,NaN] |
其中 features
列中的每一行都是一个 DenseVector
包含 float 和 NaN
数据类型的组合。有没有办法计算 DenseVector
或任意列的第一列中 NaN
的数量?例如,我想要一些东西 return 第一列有 1 NaN
,第二列有 3,第四列有 2。
据我所知,Spark SQL 不提供这样的方法,但它对 RDD
和一点 NumPy 来说很简单。
from pyspark.ml.linalg import DenseVector, Vector
import numpy as np
df = sc.parallelize([
(1, DenseVector([57.0, 1.0, 0.0, 0.0])),
(2, DenseVector([63.0, float("NaN"), 0.0, 0.0])),
(3, DenseVector([74.0, 1.0, 3.0, float("NaN")])),
(4, DenseVector([67.0, float("NaN"), 0.0, 0.0])),
(5, DenseVector([float("NaN"), 1.0, float("NaN"), float("NaN")])),
]).toDF(["id", "features"])
(df
.select("features")
.rdd
.map(lambda x: np.isnan(x.features.array))
.sum())
array([1, 2, 1, 2])
您可以使用 SQL 做类似的事情,但它需要更多的努力。辅助函数:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import Column
from typing import List
def as_array(c: Column) -> Column:
def as_array_(v: Vector) -> List[float]:
return v.array.tolist()
return udf(as_array_, ArrayType(DoubleType()))(c)
确定向量的大小:
from pyspark.sql.functions import col, size
(vlen, ) = df.na.drop().select(size(as_array(col("features")))).first()
创建表达式:
from pyspark.sql.functions import col, isnan, sum as sum_
feature_array = as_array("features").alias("features")
终于select
:
(df
.na.drop(subset=["features"])
.select([sum_(isnan(feature_array[i]).cast("bigint")) for i in range(vlen)]))
假设我有一个如下所示的 Spark DataFrame
+-----------+------------------+
| id | features|
+-----------+------------------+
| 1|[57.0,1.0,0.0,0.0]|
| 2|[63.0,NaN,0.0,0.0]|
| 3|[74.0,1.0,3.0,NaN]|
| 4|[67.0,NaN,0.0,0.0]|
| 5|[NaN,1.0,NaN,NaN] |
其中 features
列中的每一行都是一个 DenseVector
包含 float 和 NaN
数据类型的组合。有没有办法计算 DenseVector
或任意列的第一列中 NaN
的数量?例如,我想要一些东西 return 第一列有 1 NaN
,第二列有 3,第四列有 2。
据我所知,Spark SQL 不提供这样的方法,但它对 RDD
和一点 NumPy 来说很简单。
from pyspark.ml.linalg import DenseVector, Vector
import numpy as np
df = sc.parallelize([
(1, DenseVector([57.0, 1.0, 0.0, 0.0])),
(2, DenseVector([63.0, float("NaN"), 0.0, 0.0])),
(3, DenseVector([74.0, 1.0, 3.0, float("NaN")])),
(4, DenseVector([67.0, float("NaN"), 0.0, 0.0])),
(5, DenseVector([float("NaN"), 1.0, float("NaN"), float("NaN")])),
]).toDF(["id", "features"])
(df
.select("features")
.rdd
.map(lambda x: np.isnan(x.features.array))
.sum())
array([1, 2, 1, 2])
您可以使用 SQL 做类似的事情,但它需要更多的努力。辅助函数:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import Column
from typing import List
def as_array(c: Column) -> Column:
def as_array_(v: Vector) -> List[float]:
return v.array.tolist()
return udf(as_array_, ArrayType(DoubleType()))(c)
确定向量的大小:
from pyspark.sql.functions import col, size
(vlen, ) = df.na.drop().select(size(as_array(col("features")))).first()
创建表达式:
from pyspark.sql.functions import col, isnan, sum as sum_
feature_array = as_array("features").alias("features")
终于select
:
(df
.na.drop(subset=["features"])
.select([sum_(isnan(feature_array[i]).cast("bigint")) for i in range(vlen)]))