如何覆盖 spark map 函数中的设置和清理方法

How to override setup and cleanup methods in spark map function

假设有以下map reduce作业

映射器:

setup() 初始化一些状态

map() 向状态添加数据,无输出

cleanup() 将状态输出到上下文

减速器

将所有状态聚合到一个输出中

如何在spark中实现这样的工作?

补充问题:这样的工作如何在烫洗中实现? 我正在寻找以某种方式使方法重载的示例...

Spark map 不提供 Hadoop setupcleanup 的等价物。它假设每个调用都是独立的并且没有副作用。

您可以获得的最接近的等价物是使用简化模板将所需的逻辑放入 mapPartitionsmapPartitionsWithIndex 中:

rdd.mapPartitions { iter => {
   ... // initalize state
   val result = ??? // compute result for iter
   ... // perform cleanup
   ... // return results as an Iterator[U]
}}

在 scala 中设置的标准方法是使用惰性 val:

lazy val someSetupState = { .... }
data.map { x =>
  useState(someSetupState, x)
  ...

只要 someSetupState 可以在任务上实例化(即它不使用提交节点的某些本地磁盘),以上内容就可以工作。这不涉及清理。对于清理,scalding有一个方法:

    TypedPipe[T]#onComplete(fn: () => Unit): TypedPipe[T]

这是每个任务最后的 运行。与映射示例类似,您可以关闭:

    data.map { x =>
      useState(someSetupState, x)
    }
    .onComplete { () =>
      someSetupState.shutdown()
    }

我不知道 spark 的等价物。