Apache Spark:处理 RDD 中的 Option/Some/None
Apache Spark: dealing with Option/Some/None in RDDs
我正在映射一个 HBase table,为每个 HBase 行生成一个 RDD 元素。但是,有时该行包含错误数据(在解析代码中抛出 NullPointerException),在这种情况下我只想跳过它。
我有我的初始映射器 return 一个 Option
来指示它 returns 0 或 1 个元素,然后过滤 Some
,然后获取包含的值:
// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
map( tuple => getData(tuple._2) ).
filter( {case Some(y) => true; case None => false} ).
map( _.get ).
// ... more RDD operations with the good data
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
try {
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
Some( ( id, ( List(x),
// more stuff ...
) ) )
} catch {
case e: NullPointerException => {
logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
None
}
}
}
有没有更惯用的更短的方法来做到这一点?我觉得这看起来很乱,无论是 getData()
还是 map.filter.map
我正在做的舞蹈。
也许 flatMap
可以工作(在 Seq
中生成 0 或 1 个项目),但我不希望它展平我在 map 函数中创建的元组,只是消除空瓶。
如果将 getData
更改为 return a scala.util.Try
,则可以大大简化转换。像这样的东西可以工作:
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
val tr = util.Try{
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
( id, ( List(x)
// more stuff ...
) )
}
tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
tr
}
然后你的转换可以像这样开始:
myRDD.
flatMap(tuple => getData(tuple._2).toOption)
如果您的 Try
是 Failure
,它将通过 toOption
变成 None
,然后作为 flatMap
逻辑的一部分被删除。到那时,你在转换中的下一步将只处理成功的案例,无论底层类型是 return 从 getData
编辑而没有包装(即没有 Option
)
如果您可以删除数据,那么您可以使用 mapPartitions
。这是一个示例:
import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
val foo = for(y <- x)
yield {
Try(1/y)
}
for{goodVals <- foo.partition(_.isSuccess)._1}
yield goodVals.get
})
如果你想看到错误的值,那么你可以使用 accumulator
或者像以前一样记录。
您的代码将如下所示:
val output = myRDD.
mapPartitions( tupleIter => getCleanData(tupleIter) )
// ... more RDD operations with the good data
def getCleanData(iter: Iter[???]) = {
val triedData = getDataInTry(iter)
for{goodVals <- triedData.partition(_.isSuccess)._1}
yield goodVals.get
}
def getDataInTry(iter: Iter[???]) = {
for(r <- iter) yield {
Try{
val key = r._2.getRow
var id = "(unk)"
var x = -1L
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
}
}
}
另一种经常被忽视的方法是使用 collect(PartialFunction pf)
,这意味着 'select' 或 'collect' RDD 中在偏函数中定义的特定元素。
代码如下所示:
val output = myRDD.collect{case Success(tuple) => tuple }
def getData(r: Result):Try[(String, List[X])] = Try {
val id = Bytes.toString(key, 0, 11)
val x = Long.MaxValue - Bytes.toLong(key, 11)
(id, List(x))
}
我正在映射一个 HBase table,为每个 HBase 行生成一个 RDD 元素。但是,有时该行包含错误数据(在解析代码中抛出 NullPointerException),在这种情况下我只想跳过它。
我有我的初始映射器 return 一个 Option
来指示它 returns 0 或 1 个元素,然后过滤 Some
,然后获取包含的值:
// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
map( tuple => getData(tuple._2) ).
filter( {case Some(y) => true; case None => false} ).
map( _.get ).
// ... more RDD operations with the good data
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
try {
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
Some( ( id, ( List(x),
// more stuff ...
) ) )
} catch {
case e: NullPointerException => {
logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
None
}
}
}
有没有更惯用的更短的方法来做到这一点?我觉得这看起来很乱,无论是 getData()
还是 map.filter.map
我正在做的舞蹈。
也许 flatMap
可以工作(在 Seq
中生成 0 或 1 个项目),但我不希望它展平我在 map 函数中创建的元组,只是消除空瓶。
如果将 getData
更改为 return a scala.util.Try
,则可以大大简化转换。像这样的东西可以工作:
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
val tr = util.Try{
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
( id, ( List(x)
// more stuff ...
) )
}
tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
tr
}
然后你的转换可以像这样开始:
myRDD.
flatMap(tuple => getData(tuple._2).toOption)
如果您的 Try
是 Failure
,它将通过 toOption
变成 None
,然后作为 flatMap
逻辑的一部分被删除。到那时,你在转换中的下一步将只处理成功的案例,无论底层类型是 return 从 getData
编辑而没有包装(即没有 Option
)
如果您可以删除数据,那么您可以使用 mapPartitions
。这是一个示例:
import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
val foo = for(y <- x)
yield {
Try(1/y)
}
for{goodVals <- foo.partition(_.isSuccess)._1}
yield goodVals.get
})
如果你想看到错误的值,那么你可以使用 accumulator
或者像以前一样记录。
您的代码将如下所示:
val output = myRDD.
mapPartitions( tupleIter => getCleanData(tupleIter) )
// ... more RDD operations with the good data
def getCleanData(iter: Iter[???]) = {
val triedData = getDataInTry(iter)
for{goodVals <- triedData.partition(_.isSuccess)._1}
yield goodVals.get
}
def getDataInTry(iter: Iter[???]) = {
for(r <- iter) yield {
Try{
val key = r._2.getRow
var id = "(unk)"
var x = -1L
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
}
}
}
另一种经常被忽视的方法是使用 collect(PartialFunction pf)
,这意味着 'select' 或 'collect' RDD 中在偏函数中定义的特定元素。
代码如下所示:
val output = myRDD.collect{case Success(tuple) => tuple }
def getData(r: Result):Try[(String, List[X])] = Try {
val id = Bytes.toString(key, 0, 11)
val x = Long.MaxValue - Bytes.toLong(key, 11)
(id, List(x))
}