使用 numpy 数组输入从 python 方法创建 PySpark UDF,以计算和 return 单个浮点值
Creating PySpark UDFs from python method with numpy array input, to calculate and return a single float value
作为输入,我有一个包含 int 值的 csv 文件。
spark_df = spark.read.option("header", "false").csv("../int_values.csv")
df = spark_df.selectExpr("_c0 as something")
_df = df.withColumn("values", df.something.cast(FloatType())).select("values")
我还有一些为 numpy 数组输入设计的 python 函数,我需要在 Spark DataFrame 上应用它们。
例子一:
def calc_sum(float_array):
return np.sum(float_array)
实函数:
def calc_rms(float_array):
return np.sqrt(np.mean(np.diff(float_array)**2))
对于第 1 个示例,您可以使用 SQL 总和,例如:
_df.groupBy().sum().collect()
但是,我需要的是将这些函数转换为 Spark UDF 的标准解决方案
我尝试了很多方法,比如:
udf_sum = udf(lambda x : calc_sum(x), FloatType())
_df.rdd.flatMap(udf_sum).collect()
但总是失败:
TypeError: Invalid argument, not a string or column:
Row(values=1114.0) of type <class 'pyspark.sql.types.Row'>. For column
literals, use 'lit', 'array', 'struct' or 'create_map' function.
是否可以使用这些函数来转换数据?
DataFrame 示例:
In [6]: spark_df.show()
+----+
| _c0|
+----+
|1114|
|1113|
|1066|
|1119|
|1062|
|1089|
|1093|
| 975|
|1099|
|1062|
|1062|
|1162|
|1057|
|1123|
|1141|
|1089|
|1172|
|1096|
|1164|
|1146|
+----+
only showing top 20 rows
预期输出:
从 UDF 返回的浮点值。
求和函数应该清楚了。
您想要的是 groupby 并使用 collect_list
将所有整数值放入数组列中,然后在该列上应用您的 UDF。此外,您需要明确 return 从 calc_rms
:
浮动
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
def calc_rms(float_array):
return float(np.sqrt(np.mean(np.diff(float_array) ** 2)))
calc_rms_udf = F.udf(calc_rms, FloatType())
df.groupby().agg(F.collect_list("_c0").alias("_c0")) \
.select(calc_rms_udf(F.col("_c0")).alias("rms")) \
.show()
#+--------+
#| rms|
#+--------+
#|67.16202|
#+--------+
作为输入,我有一个包含 int 值的 csv 文件。
spark_df = spark.read.option("header", "false").csv("../int_values.csv")
df = spark_df.selectExpr("_c0 as something")
_df = df.withColumn("values", df.something.cast(FloatType())).select("values")
我还有一些为 numpy 数组输入设计的 python 函数,我需要在 Spark DataFrame 上应用它们。
例子一:
def calc_sum(float_array):
return np.sum(float_array)
实函数:
def calc_rms(float_array):
return np.sqrt(np.mean(np.diff(float_array)**2))
对于第 1 个示例,您可以使用 SQL 总和,例如:
_df.groupBy().sum().collect()
但是,我需要的是将这些函数转换为 Spark UDF 的标准解决方案
我尝试了很多方法,比如:
udf_sum = udf(lambda x : calc_sum(x), FloatType())
_df.rdd.flatMap(udf_sum).collect()
但总是失败:
TypeError: Invalid argument, not a string or column: Row(values=1114.0) of type <class 'pyspark.sql.types.Row'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
是否可以使用这些函数来转换数据?
DataFrame 示例:
In [6]: spark_df.show()
+----+
| _c0|
+----+
|1114|
|1113|
|1066|
|1119|
|1062|
|1089|
|1093|
| 975|
|1099|
|1062|
|1062|
|1162|
|1057|
|1123|
|1141|
|1089|
|1172|
|1096|
|1164|
|1146|
+----+
only showing top 20 rows
预期输出:
从 UDF 返回的浮点值。
求和函数应该清楚了。
您想要的是 groupby 并使用 collect_list
将所有整数值放入数组列中,然后在该列上应用您的 UDF。此外,您需要明确 return 从 calc_rms
:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
def calc_rms(float_array):
return float(np.sqrt(np.mean(np.diff(float_array) ** 2)))
calc_rms_udf = F.udf(calc_rms, FloatType())
df.groupby().agg(F.collect_list("_c0").alias("_c0")) \
.select(calc_rms_udf(F.col("_c0")).alias("rms")) \
.show()
#+--------+
#| rms|
#+--------+
#|67.16202|
#+--------+