如何在烫金工作结束时只执行一次操作?

how to perform an operation one time only at the end of a scalding job?

我阅读了 scalding groupAll 文档:

   /**
    * Group all tuples down to one reducer.
    * (due to cascading limitation).
    * This is probably only useful just before setting a tail such as Database
    * tail, so that only one reducer talks to the DB.  Kind of a hack.
    */
    def groupAll: Pipe = groupAll { _.pass }

这让我有充分的理由相信,如果我 pipe 我的最终 write 结果进入一个 statusUpdater 管道,它只是更新了我的工作成功完成的一些数据库,那么它将是作业完成后执行一次,但我在

中尝试过

以下代码示例:

import Dsl._
somepipe
  .addCount
  .toPipe(outputSchema)
  .write(Tsv(outputPath, outputSchema, writeHeader = true))(flowDef, mode)
  .groupAll.updateResultStatus

  implicit class StatusResultsUpdater(pipe: Pipe) {
    def updateResultStatus: Pipe = {
      println("DO THIS ONCE AFTER JOB COMPLETES!") // was printed even before the job ended! how to have it print only when job ends!?
      pipe
    }
  }

根据我使用的文档 groupAll 那么 updateResultStatus 应该是 运行 只有在作业结束后并且只有一次,为什么我看到它在作业结束之前就已经打印了语句?我错过了什么吗?我应该怎么做才能起作用?

Scalding job中的执行顺序有点棘手:

  1. 执行作业 class 中的初始化语句并构建操作树(连接 Pipes、Taps 等)
  2. 树被移交给优化器。执行计划已创建
  3. 作业开始执行。 Hadoop作业的Map和Reduce步骤按计划启动
  4. 主程序等待一切完成并退出。

根据您的代码,println 语句将在第 1 步执行。