Spark 1.5.2 DataFramesUDF 在对象重新使用时避免竞争条件
Spark 1.5.2 DataFramesUDF Avoid Race Condition upon Object Re-Usage
这里的问题是如何重用 UDF 的对象同时避免竞争条件?
我在我的 spark 应用程序中使用 UDF,由于竞争条件,单元测试似乎是不确定的。有时他们通过有时他们失败...
我试图通过创建对象并将它们传递给 UDF 来强制重新使用对象以提高效率。然而,似乎单独的 "tests" 共享相同的 spark 上下文和 JVM 正在使用这些对象并导致错误。
def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
sdfOut.format(sdfIn.parse(input))
}
val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))
有时,当我 运行 我的单元测试时,我会收到此函数的以下错误:
17/01/13 11:45:45 ERROR Executor: Exception in task 0.0 in stage 2.0
(TID 2) java.lang.NumberFormatException: multiple points at
sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at
java.lang.Double.parseDouble(Double.java:538) at
java.text.DigitList.getDouble(DigitList.java:169) at
java.text.DecimalFormat.parse(DecimalFormat.java:2056) at
java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867) at
java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) at
java.text.DateFormat.parse(DateFormat.java:364) at
com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF$.reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)
我是这样使用函数的:
val df = df2
.filter(
datediff(
to_date(partitionToDateUDF($"dt"))
,to_date(dTStampToDate($"d_last_seen"))
) < 90
)
调试后发现输入 "df2" 为:
+-----------+--------+-------------------------+--------------------------------+
|d_last_seen| dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02 |2016-11-02 |
| 2016/11/01|20161102|2016-11-02 |2016-11-01 |
+-----------+--------+-------------------------+--------------------------------+
我使用 conf.setMaster("local[2]"),可能是 spark 使用线程,因此在本地 运行ning 时共享相同的 JVM,但是部署时不会发生这种情况因为单独的执行程序将有自己的 JVM,因此有自己的对象实例化?
SimpleDateFormat
不是 thread-safe(参见示例 Why is Java's SimpleDateFormat not thread-safe?)。这意味着如果你在任何 UDF 中使用它(即使是在一个单独的 Spark 作业中)你可能会得到意想不到的结果,因为 spark 会在多个 tasks which 运行 on单独的 threads 最终有多个线程同时访问它。这对于本地模式和实际的分布式集群都是如此 - 每个执行程序上的多个线程将使用一个副本。
要克服这个问题 - 只需使用不同的格式化程序,是 thread-safe,例如乔达的 DateTimeFormatter
.
这里的问题是如何重用 UDF 的对象同时避免竞争条件?
我在我的 spark 应用程序中使用 UDF,由于竞争条件,单元测试似乎是不确定的。有时他们通过有时他们失败...
我试图通过创建对象并将它们传递给 UDF 来强制重新使用对象以提高效率。然而,似乎单独的 "tests" 共享相同的 spark 上下文和 JVM 正在使用这些对象并导致错误。
def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
sdfOut.format(sdfIn.parse(input))
}
val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))
有时,当我 运行 我的单元测试时,我会收到此函数的以下错误:
17/01/13 11:45:45 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: multiple points at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at java.text.DigitList.getDouble(DigitList.java:169) at java.text.DecimalFormat.parse(DecimalFormat.java:2056) at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867) at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) at java.text.DateFormat.parse(DateFormat.java:364) at com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF$.reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)
我是这样使用函数的:
val df = df2
.filter(
datediff(
to_date(partitionToDateUDF($"dt"))
,to_date(dTStampToDate($"d_last_seen"))
) < 90
)
调试后发现输入 "df2" 为:
+-----------+--------+-------------------------+--------------------------------+
|d_last_seen| dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02 |2016-11-02 |
| 2016/11/01|20161102|2016-11-02 |2016-11-01 |
+-----------+--------+-------------------------+--------------------------------+
我使用 conf.setMaster("local[2]"),可能是 spark 使用线程,因此在本地 运行ning 时共享相同的 JVM,但是部署时不会发生这种情况因为单独的执行程序将有自己的 JVM,因此有自己的对象实例化?
SimpleDateFormat
不是 thread-safe(参见示例 Why is Java's SimpleDateFormat not thread-safe?)。这意味着如果你在任何 UDF 中使用它(即使是在一个单独的 Spark 作业中)你可能会得到意想不到的结果,因为 spark 会在多个 tasks which 运行 on单独的 threads 最终有多个线程同时访问它。这对于本地模式和实际的分布式集群都是如此 - 每个执行程序上的多个线程将使用一个副本。
要克服这个问题 - 只需使用不同的格式化程序,是 thread-safe,例如乔达的 DateTimeFormatter
.