在 UDAF 中重用 SparkContext 对象
Re-using SparkContext object in UDAF
我正在尝试实现 org.apache.spark.mllib.stat.KernelDensity
的聚合版本以同时估计多个分布的概率密度函数。
我们的想法是拥有一个包含 2 列的数据框:一列用于组名,第二列包含单变量观察值(将有 1000 组,因此需要并发处理)。
我的想法是这样的(pdf
列将包含一个包含 PDF 值的数组):
> val getPdf = new PDFGetter(sparkContext)
> df_with_group_and_observation_columns.groupBy("group").agg(getPdf(col("observations")).as("pdf")).show()
我已经实现了一个用户定义的聚合函数来(希望)做到这一点。
我对当前的实施有 2 个问题,正在寻求您的建议:
- 显然无法在 UDAF 的
evaluate()
函数中重复使用 sparkContext
对象。一旦 UDAF 尝试访问 sparkContext 对象(请参阅下面的详细信息),我目前会收到 java.io.NotSerializableException
。 ==> 这正常吗?关于如何解决这个问题有什么想法吗?
- 我当前的 UDAF 实现将从数据框(分布式)中获取每个组的所有观察值,将它们放入
Seq()
(WrappedArray)中,然后尝试 运行 parallelize()
在每个组的 Seq()
上重新分配观察结果。这似乎效率很低。 ==> 有没有办法让 UDAF 在 运行 时间内直接 "give" 每个组的 "sub-RDD" 到它的每个 evaluate()
函数?
下面是我到目前为止的一个完整示例(不要介意 return 值是字符串而不是数组,我只是想看看是否可以让内核密度在 UDAF 中工作现在):
Spark context available as 'sc' (master = local[*], app id = local-1514639826952).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
scala> sc.toString
res27: String = org.apache.spark.SparkContext@2a96ed1b
scala> val df = Seq(("a", 1.0), ("a", 1.5), ("a", 2.0), ("a", 1.8), ("a", 1.1), ("a", 1.2), ("a", 1.9), ("a", 1.3), ("a", 1.2), ("a", 1.9), ("b", 10.0), ("b", 20.0), ("b", 11.0), ("b", 18.0), ("b", 13.0), ("b", 16.0), ("b", 15.0), ("b", 12.0), ("b", 18.0), ("b", 11.0)).toDF("group", "val")
scala> val getPdf = new PDFGetter(sc)
scala> df.groupBy("group").agg(getPdf(col("val")).as("pdf")).show()
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2a96ed1b)
- field (class: PDFGetter, name: sc, type: class org.apache.spark.SparkContext)
- object (class PDFGetter, PDFGetter@38649ca3)
...
请参阅下面的 UDAF 定义(否则效果很好):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.{ListBuffer, ArrayBuffer}
import org.apache.spark.mllib.stat.KernelDensity
class PDFGetter(var sc: org.apache.spark.SparkContext) extends UserDefinedAggregateFunction {
// Define the schema of the input data,
// intermediate processing (deals with each individual observation within each group)
// and return type of the UDAF
override def inputSchema: StructType = StructType(StructField("result_dbl", DoubleType) :: Nil)
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
override def dataType: DataType = StringType
// The UDAF will always return the same results
// given the same inputs
override def deterministic: Boolean = true
// How to initialize the intermediate processing buffer
// for each group
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.emptyDoubleArray
}
// What to do with each new row within the group
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
var values = new ListBuffer[Double]()
values.appendAll(buffer.getAs[List[Double]](0))
val newValue = input.getDouble(0)
values.append(newValue)
buffer.update(0, values)
}
// How to merge 2 buffers located on 2 separate
// executor hosts or JVMs
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
var values = new ListBuffer[Double]()
values ++= buffer1.getAs[List[Double]](0)
values ++= buffer2.getAs[List[Double]](0)
buffer1.update(0, values)
}
// What to do with the data once intermediate processing
// is completed
override def evaluate(buffer: Row): String = {
// Get the observations
val observations = buffer.getSeq[Double](0) // Or val observations = buffer.getAs[Seq[Double]](0) // Returns a WrappedArray either way
//observations.toString
// Calculate the bandwidth
val nObs = observations.size.toDouble
val mean = observations.sum / nObs
val stdDev = Math.sqrt(observations.map(x => Math.pow(x - mean, 2.0) ).sum / nObs)
val bandwidth = stdDev / 2.5
//bandwidth.toString
// Kernel Density
// From the example at http://spark.apache.org/docs/latest/api/java/index.html#org.apache.spark.sql.Dataset
// val sample = sc.parallelize(Seq(0.0, 1.0, 4.0, 4.0))
// val kd = new KernelDensity()
// .setSample(sample)
// .setBandwidth(3.0)
// val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
// Get the observations as an rdd (required by KernelDensity.setSample)
sc.toString // <==== This fails
val observationsRDD = sc.parallelize(observations)
// Create a new Kernel density object
// for these observations
val kd = new KernelDensity()
kd.setSample(observationsRDD)
kd.setBandwidth(bandwidth)
// Create the points at which
// the PDF will be estimated
val minObs = observations.min
val maxObs = observations.max
val nPoints = Math.min(nObs/2, 1000.0).toInt
val increment = (maxObs - minObs) / nPoints.toDouble
val points = new Array[Double](nPoints)
for( i <- 0 until nPoints){
points(i) = minObs + i.toDouble * increment;
}
// Estimate the PDF and return
val pdf = kd.estimate(points)
pdf.toString
}
}
很抱歉拖了这么久post,但我觉得这件事很棘手,所以我想所有的细节都会对那里的任何帮助者有用。
谢谢!
这是行不通的。你不能:
- 在执行器上访问
SparkContext
、SparkSession
、SQLContext
(其中调用 evaluate
)。
- 在执行器上访问或创建分布式数据结构。
回答可能的后续问题 - 没有解决方法。这是一个核心设计决策,是 Spark 设计的基础。
我能够通过使用 Kernel Density 的另一种实现来消除对 sc.parallelize(observations)
的需求,它不需要通过 RDD 提供观察结果,而是一个简单的 Array[Double]
(因此不分发) ).
详情请见以下链接:
http://haifengl.github.io/smile/index.html
https://github.com/haifengl/smile
注意:对于那些想要了解 UDAF 的人来说,上面的代码仍然是一个不错的示例 - 只需删除构造函数中的参数 sc
并确保不要尝试使用 SparkContext
在任何 UDAF 函数中。
干杯!
我正在尝试实现 org.apache.spark.mllib.stat.KernelDensity
的聚合版本以同时估计多个分布的概率密度函数。
我们的想法是拥有一个包含 2 列的数据框:一列用于组名,第二列包含单变量观察值(将有 1000 组,因此需要并发处理)。
我的想法是这样的(pdf
列将包含一个包含 PDF 值的数组):
> val getPdf = new PDFGetter(sparkContext)
> df_with_group_and_observation_columns.groupBy("group").agg(getPdf(col("observations")).as("pdf")).show()
我已经实现了一个用户定义的聚合函数来(希望)做到这一点。 我对当前的实施有 2 个问题,正在寻求您的建议:
- 显然无法在 UDAF 的
evaluate()
函数中重复使用sparkContext
对象。一旦 UDAF 尝试访问 sparkContext 对象(请参阅下面的详细信息),我目前会收到java.io.NotSerializableException
。 ==> 这正常吗?关于如何解决这个问题有什么想法吗? - 我当前的 UDAF 实现将从数据框(分布式)中获取每个组的所有观察值,将它们放入
Seq()
(WrappedArray)中,然后尝试 运行parallelize()
在每个组的Seq()
上重新分配观察结果。这似乎效率很低。 ==> 有没有办法让 UDAF 在 运行 时间内直接 "give" 每个组的 "sub-RDD" 到它的每个evaluate()
函数?
下面是我到目前为止的一个完整示例(不要介意 return 值是字符串而不是数组,我只是想看看是否可以让内核密度在 UDAF 中工作现在):
Spark context available as 'sc' (master = local[*], app id = local-1514639826952).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
scala> sc.toString
res27: String = org.apache.spark.SparkContext@2a96ed1b
scala> val df = Seq(("a", 1.0), ("a", 1.5), ("a", 2.0), ("a", 1.8), ("a", 1.1), ("a", 1.2), ("a", 1.9), ("a", 1.3), ("a", 1.2), ("a", 1.9), ("b", 10.0), ("b", 20.0), ("b", 11.0), ("b", 18.0), ("b", 13.0), ("b", 16.0), ("b", 15.0), ("b", 12.0), ("b", 18.0), ("b", 11.0)).toDF("group", "val")
scala> val getPdf = new PDFGetter(sc)
scala> df.groupBy("group").agg(getPdf(col("val")).as("pdf")).show()
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2a96ed1b)
- field (class: PDFGetter, name: sc, type: class org.apache.spark.SparkContext)
- object (class PDFGetter, PDFGetter@38649ca3)
...
请参阅下面的 UDAF 定义(否则效果很好):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.{ListBuffer, ArrayBuffer}
import org.apache.spark.mllib.stat.KernelDensity
class PDFGetter(var sc: org.apache.spark.SparkContext) extends UserDefinedAggregateFunction {
// Define the schema of the input data,
// intermediate processing (deals with each individual observation within each group)
// and return type of the UDAF
override def inputSchema: StructType = StructType(StructField("result_dbl", DoubleType) :: Nil)
override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)
override def dataType: DataType = StringType
// The UDAF will always return the same results
// given the same inputs
override def deterministic: Boolean = true
// How to initialize the intermediate processing buffer
// for each group
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.emptyDoubleArray
}
// What to do with each new row within the group
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
var values = new ListBuffer[Double]()
values.appendAll(buffer.getAs[List[Double]](0))
val newValue = input.getDouble(0)
values.append(newValue)
buffer.update(0, values)
}
// How to merge 2 buffers located on 2 separate
// executor hosts or JVMs
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
var values = new ListBuffer[Double]()
values ++= buffer1.getAs[List[Double]](0)
values ++= buffer2.getAs[List[Double]](0)
buffer1.update(0, values)
}
// What to do with the data once intermediate processing
// is completed
override def evaluate(buffer: Row): String = {
// Get the observations
val observations = buffer.getSeq[Double](0) // Or val observations = buffer.getAs[Seq[Double]](0) // Returns a WrappedArray either way
//observations.toString
// Calculate the bandwidth
val nObs = observations.size.toDouble
val mean = observations.sum / nObs
val stdDev = Math.sqrt(observations.map(x => Math.pow(x - mean, 2.0) ).sum / nObs)
val bandwidth = stdDev / 2.5
//bandwidth.toString
// Kernel Density
// From the example at http://spark.apache.org/docs/latest/api/java/index.html#org.apache.spark.sql.Dataset
// val sample = sc.parallelize(Seq(0.0, 1.0, 4.0, 4.0))
// val kd = new KernelDensity()
// .setSample(sample)
// .setBandwidth(3.0)
// val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
// Get the observations as an rdd (required by KernelDensity.setSample)
sc.toString // <==== This fails
val observationsRDD = sc.parallelize(observations)
// Create a new Kernel density object
// for these observations
val kd = new KernelDensity()
kd.setSample(observationsRDD)
kd.setBandwidth(bandwidth)
// Create the points at which
// the PDF will be estimated
val minObs = observations.min
val maxObs = observations.max
val nPoints = Math.min(nObs/2, 1000.0).toInt
val increment = (maxObs - minObs) / nPoints.toDouble
val points = new Array[Double](nPoints)
for( i <- 0 until nPoints){
points(i) = minObs + i.toDouble * increment;
}
// Estimate the PDF and return
val pdf = kd.estimate(points)
pdf.toString
}
}
很抱歉拖了这么久post,但我觉得这件事很棘手,所以我想所有的细节都会对那里的任何帮助者有用。
谢谢!
这是行不通的。你不能:
- 在执行器上访问
SparkContext
、SparkSession
、SQLContext
(其中调用evaluate
)。 - 在执行器上访问或创建分布式数据结构。
回答可能的后续问题 - 没有解决方法。这是一个核心设计决策,是 Spark 设计的基础。
我能够通过使用 Kernel Density 的另一种实现来消除对 sc.parallelize(observations)
的需求,它不需要通过 RDD 提供观察结果,而是一个简单的 Array[Double]
(因此不分发) ).
详情请见以下链接:
http://haifengl.github.io/smile/index.html
https://github.com/haifengl/smile
注意:对于那些想要了解 UDAF 的人来说,上面的代码仍然是一个不错的示例 - 只需删除构造函数中的参数 sc
并确保不要尝试使用 SparkContext
在任何 UDAF 函数中。
干杯!