基于级联的烫伤(旧版本)计数器

Scalding (older versions) counters based on cascading

scalding 的旧版本中,其 API 中仍然没有引入 countersHadoop Counters In Scalding 建议如何在烫伤中回退到级联计数器

def addCounter(pipe : Pipe, group : String, counter : String) = {

  pipe.each(() -> ('addCounter)) ( fields =>
    new BaseOperation[Any](fields) with Function[Any] {

      def operate(flowProcess : FlowProcess[_], 
        functionCall : FunctionCall[Any]) {

          try {
            flowProcess.asInstanceOf[HadoopFlowProcess]
              .increment(group, counter, 1L)
            functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
          } catch {
            case cce: ClassCastException =>
            // HadoopFlowProcess is not available in local mode
          }
      }.discard('addCounter)
    }
  )
}

然而,当我尝试时,我得到:

Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^

我错过了什么吗? 我用的烫版:0.8.7

.discard是一个烫伤命令,因此应该与代码块中的另一个烫伤命令.each处于同一级别。尝试将其放在最后一个右括号“)”之后。 (您发布的代码中的倒数第二行。)

在这里,操作链接到 RichPipe 管道,首先是 each,然后是 discard:

pipe.each(...){predicate}.discard(...)