在 Spark Dataframe 处理中使用 Scala Breeze 包线程安全吗?
Is the Scala Breeze package thread safe for use in Spark Dataframe processing?
对单例变量使用 ThreadLocal 是否使它们在 Spark Dataframe 处理框架中使用时是线程安全的? Breeze fourierTr 函数使用 ThreadLocal,它似乎给我带来了问题。
我正在为 assemble 多维表构建一个应用程序,并在各个维度上计算 FFT。
val r = df.rdd.flatMap{ row =>
// scrub the input, format data into coordinates with a value
// create a key corresponding to a slice through the data
// that will get processed in the next step
}
.groupByKey.flatMap{ case( sliceKey, coordinateList ) =>
// note the vector length is variable
val buf = new Array[Complex]( lengthOfVector )
// fill buffer with values from data structure slice
fourierTr( new DenseVector(buf) )
}
注意,这只是伪代码。我已经删除了很多真实的代码来做一个简洁的例子。
重点是fourierTr的调用。当我在我的本地开发机器上 运行 一切正常时,我得到了预期的结果。但是,当我转移到更大的多核机器时,出现以下异常:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
at breeze.signal.fourierTr$$anon.apply(fourierTr.scala:69)
at breeze.signal.fourierTr$$anon.apply(fourierTr.scala:62)
at breeze.generic.UFunc$class.apply(UFunc.scala:48)
at breeze.signal.fourierTr$.apply(fourierTr.scala:25)
起初我认为这可能是由于我的开发机器和集群(即使用 AWS)之间的包版本控制差异所致。在确保所有相关的 jar 版本都匹配后,我仍然遇到同样的问题。然后我确定如果我用
启动应用程序 运行 就没问题
spark-submit --master local[1] ...
但是,如果我用
启动它
spark-submit --master local[2] ...
或任何大于 2 的节点数然后我会得到异常。这让我怀疑某些记忆以某种方式被破坏了。所以我开始挖掘库源代码。
入口点在fourierTr.scala
implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
def apply(v: DenseVector[Complex]) = {
//reformat for input: note difference in format for input to real fft
val tempArr = denseVectorCToTemp(v)
//actual action
val fft_instance = getD1DInstance(v.length)
fft_instance.complexForward( tempArr ) //does operation in place
//reformat for output
tempToDenseVector(tempArr)
}
}
}
在 JTransformsSupport.scala 中推送到 getD1DInstance 我发现 ...
object JTransformsSupport {
//maintain instance of transform to eliminate repeated initialization
private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
def getD1DInstance(length: Int): DoubleFFT_1D = {
if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
else {
fft_instD1D.set((length, new DoubleFFT_1D(length)))
fft_instD1D.get()._2
}
}
注意它正在修改一个共享变量fft_instD1D。我对 ThreadLocal 类型不是很熟悉,但这似乎是为了使 class 线程安全。但是,我更改了我的代码以将 DoubleFFT_1D 对象实例化为堆栈变量,然后我直接调用了所有低级例程(例如,而不是调用 fourierTr 我调用了 DoubleFFT_1D.complexForward)。
进行此更改后,无论 Spark 使用的节点数量如何,都不再发生异常。所以看起来 Fourier t运行sform 库对 ThreadLocal 的使用是罪魁祸首。
我想知道其他认为自己是Scala/Breeze/Spark专家的人是否同意我的结论?
如果不正确,请建议如何在 Spark Dataframe 处理的上下文中正确使用 Breeze(特别是 fourierTr)。
如果这是正确的结论,那么我还有一些后续问题...
- 我认为可以从内部调用 Breeze 函数是错误的吗
数据帧处理管道?如果不打算从 Dataframe 处理管道调用 Breeze,那么是否有一种标准方法来包装库以便可以从管道调用它,或者通常有必要做我所做的并重写部分库消除共享变量的功能?
- 如果要从 Dataframe 管道调用 Breeze,则
这看起来像是 Breeze 库中的错误,还是
ThreadLocal class 的实现? ... IE。谁的关注
我应该把这个带到吗?
请向 Breeze 提交错误:github。com/scalanlp/breeze/issues
Breeze 努力做到线程安全(这就是为什么我们使用 ThreadLocal 让每个线程有一个转换实例),但这里出了点问题。 (ThreadLocal 代码在我看来完全没问题,但后来我又写了它。)
对单例变量使用 ThreadLocal 是否使它们在 Spark Dataframe 处理框架中使用时是线程安全的? Breeze fourierTr 函数使用 ThreadLocal,它似乎给我带来了问题。
我正在为 assemble 多维表构建一个应用程序,并在各个维度上计算 FFT。
val r = df.rdd.flatMap{ row =>
// scrub the input, format data into coordinates with a value
// create a key corresponding to a slice through the data
// that will get processed in the next step
}
.groupByKey.flatMap{ case( sliceKey, coordinateList ) =>
// note the vector length is variable
val buf = new Array[Complex]( lengthOfVector )
// fill buffer with values from data structure slice
fourierTr( new DenseVector(buf) )
}
注意,这只是伪代码。我已经删除了很多真实的代码来做一个简洁的例子。
重点是fourierTr的调用。当我在我的本地开发机器上 运行 一切正常时,我得到了预期的结果。但是,当我转移到更大的多核机器时,出现以下异常:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
at breeze.signal.fourierTr$$anon.apply(fourierTr.scala:69)
at breeze.signal.fourierTr$$anon.apply(fourierTr.scala:62)
at breeze.generic.UFunc$class.apply(UFunc.scala:48)
at breeze.signal.fourierTr$.apply(fourierTr.scala:25)
起初我认为这可能是由于我的开发机器和集群(即使用 AWS)之间的包版本控制差异所致。在确保所有相关的 jar 版本都匹配后,我仍然遇到同样的问题。然后我确定如果我用
启动应用程序 运行 就没问题spark-submit --master local[1] ...
但是,如果我用
启动它spark-submit --master local[2] ...
或任何大于 2 的节点数然后我会得到异常。这让我怀疑某些记忆以某种方式被破坏了。所以我开始挖掘库源代码。
入口点在fourierTr.scala
implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
def apply(v: DenseVector[Complex]) = {
//reformat for input: note difference in format for input to real fft
val tempArr = denseVectorCToTemp(v)
//actual action
val fft_instance = getD1DInstance(v.length)
fft_instance.complexForward( tempArr ) //does operation in place
//reformat for output
tempToDenseVector(tempArr)
}
}
}
在 JTransformsSupport.scala 中推送到 getD1DInstance 我发现 ...
object JTransformsSupport {
//maintain instance of transform to eliminate repeated initialization
private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
def getD1DInstance(length: Int): DoubleFFT_1D = {
if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
else {
fft_instD1D.set((length, new DoubleFFT_1D(length)))
fft_instD1D.get()._2
}
}
注意它正在修改一个共享变量fft_instD1D。我对 ThreadLocal 类型不是很熟悉,但这似乎是为了使 class 线程安全。但是,我更改了我的代码以将 DoubleFFT_1D 对象实例化为堆栈变量,然后我直接调用了所有低级例程(例如,而不是调用 fourierTr 我调用了 DoubleFFT_1D.complexForward)。
进行此更改后,无论 Spark 使用的节点数量如何,都不再发生异常。所以看起来 Fourier t运行sform 库对 ThreadLocal 的使用是罪魁祸首。
我想知道其他认为自己是Scala/Breeze/Spark专家的人是否同意我的结论?
如果不正确,请建议如何在 Spark Dataframe 处理的上下文中正确使用 Breeze(特别是 fourierTr)。
如果这是正确的结论,那么我还有一些后续问题...
- 我认为可以从内部调用 Breeze 函数是错误的吗 数据帧处理管道?如果不打算从 Dataframe 处理管道调用 Breeze,那么是否有一种标准方法来包装库以便可以从管道调用它,或者通常有必要做我所做的并重写部分库消除共享变量的功能?
- 如果要从 Dataframe 管道调用 Breeze,则 这看起来像是 Breeze 库中的错误,还是 ThreadLocal class 的实现? ... IE。谁的关注 我应该把这个带到吗?
请向 Breeze 提交错误:github。com/scalanlp/breeze/issues
Breeze 努力做到线程安全(这就是为什么我们使用 ThreadLocal 让每个线程有一个转换实例),但这里出了点问题。 (ThreadLocal 代码在我看来完全没问题,但后来我又写了它。)