如何覆盖 spark map 函数中的设置和清理方法
How to override setup and cleanup methods in spark map function
假设有以下map reduce作业
映射器:
setup() 初始化一些状态
map() 向状态添加数据,无输出
cleanup() 将状态输出到上下文
减速器:
将所有状态聚合到一个输出中
如何在spark中实现这样的工作?
补充问题:这样的工作如何在烫洗中实现?
我正在寻找以某种方式使方法重载的示例...
Spark map
不提供 Hadoop setup
和 cleanup
的等价物。它假设每个调用都是独立的并且没有副作用。
您可以获得的最接近的等价物是使用简化模板将所需的逻辑放入 mapPartitions
或 mapPartitionsWithIndex
中:
rdd.mapPartitions { iter => {
... // initalize state
val result = ??? // compute result for iter
... // perform cleanup
... // return results as an Iterator[U]
}}
在 scala 中设置的标准方法是使用惰性 val:
lazy val someSetupState = { .... }
data.map { x =>
useState(someSetupState, x)
...
只要 someSetupState
可以在任务上实例化(即它不使用提交节点的某些本地磁盘),以上内容就可以工作。这不涉及清理。对于清理,scalding有一个方法:
TypedPipe[T]#onComplete(fn: () => Unit): TypedPipe[T]
这是每个任务最后的 运行。与映射示例类似,您可以关闭:
data.map { x =>
useState(someSetupState, x)
}
.onComplete { () =>
someSetupState.shutdown()
}
我不知道 spark 的等价物。
假设有以下map reduce作业
映射器:
setup() 初始化一些状态
map() 向状态添加数据,无输出
cleanup() 将状态输出到上下文
减速器:
将所有状态聚合到一个输出中
如何在spark中实现这样的工作?
补充问题:这样的工作如何在烫洗中实现? 我正在寻找以某种方式使方法重载的示例...
Spark map
不提供 Hadoop setup
和 cleanup
的等价物。它假设每个调用都是独立的并且没有副作用。
您可以获得的最接近的等价物是使用简化模板将所需的逻辑放入 mapPartitions
或 mapPartitionsWithIndex
中:
rdd.mapPartitions { iter => {
... // initalize state
val result = ??? // compute result for iter
... // perform cleanup
... // return results as an Iterator[U]
}}
在 scala 中设置的标准方法是使用惰性 val:
lazy val someSetupState = { .... }
data.map { x =>
useState(someSetupState, x)
...
只要 someSetupState
可以在任务上实例化(即它不使用提交节点的某些本地磁盘),以上内容就可以工作。这不涉及清理。对于清理,scalding有一个方法:
TypedPipe[T]#onComplete(fn: () => Unit): TypedPipe[T]
这是每个任务最后的 运行。与映射示例类似,您可以关闭:
data.map { x =>
useState(someSetupState, x)
}
.onComplete { () =>
someSetupState.shutdown()
}
我不知道 spark 的等价物。