当 udf 函数不接受足够大的输入变量时,Spark DataFrames
Spark DataFrames when udf functions do not accept large enough input variables
我正在准备一个带有 id 和我的特征向量的 DataFrame 以供稍后用于进行预测。我在我的数据框上做了一个 groupBy,在我的 groupBy 中,我将几列作为列表合并到一个新列中:
def mergeFunction(...) // with 14 input variables
val myudffunction( mergeFunction ) // Spark doesn't support this
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
myudffunction(
col(...)
, col(...) )
.select("id", "feature_labels")
这就是我创建特征向量及其标签的方式。到目前为止它一直在为我工作,但这是我使用此方法的特征向量第一次大于数字 10,这是 Spark 中的 udf 函数最多接受的值。
I am not sure how else I can fix this? Is the size of udf inputs in
Spark going to get bigger, am have I understood them incorrectly, or
there is a better way?
最多为 22 个参数定义了用户定义函数。最多只为 10 个参数定义 udf
帮助程序。要处理具有更多参数的函数,您可以使用 org.apache.spark.sql.UDFRegistration
.
例如
val dummy = ((
x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int,
x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int,
x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1)
范被注册:
import org.apache.spark.sql.expressions.UserDefinedFunction
val dummyUdf: UserDefinedFunction = spark.udf.register("dummy", dummy)
并直接使用
val df = spark.range(1)
val exprs = (0 to 21).map(_ => lit(1))
df.select(dummyUdf(exprs: _*))
或通过 callUdf
的名称
import org.apache.spark.sql.functions.callUDF
df.select(
callUDF("dummy", exprs: _*).alias("dummy")
)
或SQL表达式:
df.selectExpr(s"""dummy(${Seq.fill(22)(1).mkString(",")})""")
您还可以创建一个 UserDefinedFunction
对象:
import org.apache.spark.sql.expressions.UserDefinedFunction
Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*))
在实践中,具有 22 个参数的函数并不是很有用,除非您想使用 Scala 反射来生成这些参数,否则维护噩梦。
我会考虑使用集合 (array
、map
) 或 struct
作为输入或将其分成多个模块。例如:
val aLongArray = array((0 to 256).map(_ => lit(1)): _*)
val udfWitharray = udf((xs: Seq[Int]) => 1)
Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy"))
只是为了扩展零的答案,可以使 .withColumn()
函数与具有 10 个以上参数的 UDF 一起工作。只需要 spark.udf.register()
函数,然后使用 expr
作为添加列的参数(而不是 udf
)。
例如,像这样的东西应该可以工作:
def mergeFunction(...) // with 14 input variables
spark.udf.register("mergeFunction", mergeFunction) // make available in expressions
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
expr("mergeFunction(col1, col2, col3, col4, ...)") ) //pass in the 14 column names
.select("id", "feature_labels")
底层表达式解析器似乎可以处理超过 10 个参数,因此我认为您不必诉诸传递数组来调用函数。此外,如果它们的参数恰好是不同的数据类型,数组将无法正常工作。
我正在准备一个带有 id 和我的特征向量的 DataFrame 以供稍后用于进行预测。我在我的数据框上做了一个 groupBy,在我的 groupBy 中,我将几列作为列表合并到一个新列中:
def mergeFunction(...) // with 14 input variables
val myudffunction( mergeFunction ) // Spark doesn't support this
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
myudffunction(
col(...)
, col(...) )
.select("id", "feature_labels")
这就是我创建特征向量及其标签的方式。到目前为止它一直在为我工作,但这是我使用此方法的特征向量第一次大于数字 10,这是 Spark 中的 udf 函数最多接受的值。
I am not sure how else I can fix this? Is the size of udf inputs in Spark going to get bigger, am have I understood them incorrectly, or there is a better way?
最多为 22 个参数定义了用户定义函数。最多只为 10 个参数定义 udf
帮助程序。要处理具有更多参数的函数,您可以使用 org.apache.spark.sql.UDFRegistration
.
例如
val dummy = ((
x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int,
x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int,
x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1)
范被注册:
import org.apache.spark.sql.expressions.UserDefinedFunction
val dummyUdf: UserDefinedFunction = spark.udf.register("dummy", dummy)
并直接使用
val df = spark.range(1)
val exprs = (0 to 21).map(_ => lit(1))
df.select(dummyUdf(exprs: _*))
或通过 callUdf
import org.apache.spark.sql.functions.callUDF
df.select(
callUDF("dummy", exprs: _*).alias("dummy")
)
或SQL表达式:
df.selectExpr(s"""dummy(${Seq.fill(22)(1).mkString(",")})""")
您还可以创建一个 UserDefinedFunction
对象:
import org.apache.spark.sql.expressions.UserDefinedFunction
Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*))
在实践中,具有 22 个参数的函数并不是很有用,除非您想使用 Scala 反射来生成这些参数,否则维护噩梦。
我会考虑使用集合 (array
、map
) 或 struct
作为输入或将其分成多个模块。例如:
val aLongArray = array((0 to 256).map(_ => lit(1)): _*)
val udfWitharray = udf((xs: Seq[Int]) => 1)
Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy"))
只是为了扩展零的答案,可以使 .withColumn()
函数与具有 10 个以上参数的 UDF 一起工作。只需要 spark.udf.register()
函数,然后使用 expr
作为添加列的参数(而不是 udf
)。
例如,像这样的东西应该可以工作:
def mergeFunction(...) // with 14 input variables
spark.udf.register("mergeFunction", mergeFunction) // make available in expressions
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
expr("mergeFunction(col1, col2, col3, col4, ...)") ) //pass in the 14 column names
.select("id", "feature_labels")
底层表达式解析器似乎可以处理超过 10 个参数,因此我认为您不必诉诸传递数组来调用函数。此外,如果它们的参数恰好是不同的数据类型,数组将无法正常工作。