基于级联的烫伤(旧版本)计数器
Scalding (older versions) counters based on cascading
在 scalding
的旧版本中,其 API 中仍然没有引入 counters
。 Hadoop Counters In Scalding 建议如何在烫伤中回退到级联计数器
def addCounter(pipe : Pipe, group : String, counter : String) = {
pipe.each(() -> ('addCounter)) ( fields =>
new BaseOperation[Any](fields) with Function[Any] {
def operate(flowProcess : FlowProcess[_],
functionCall : FunctionCall[Any]) {
try {
flowProcess.asInstanceOf[HadoopFlowProcess]
.increment(group, counter, 1L)
functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
} catch {
case cce: ClassCastException =>
// HadoopFlowProcess is not available in local mode
}
}.discard('addCounter)
}
)
}
然而,当我尝试时,我得到:
Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^
我错过了什么吗?
我用的烫版:0.8.7
.discard
是一个烫伤命令,因此应该与代码块中的另一个烫伤命令.each
处于同一级别。尝试将其放在最后一个右括号“)”之后。 (您发布的代码中的倒数第二行。)
在这里,操作链接到 RichPipe 管道,首先是 each
,然后是 discard
:
pipe.each(...){predicate}.discard(...)
在 scalding
的旧版本中,其 API 中仍然没有引入 counters
。 Hadoop Counters In Scalding 建议如何在烫伤中回退到级联计数器
def addCounter(pipe : Pipe, group : String, counter : String) = {
pipe.each(() -> ('addCounter)) ( fields =>
new BaseOperation[Any](fields) with Function[Any] {
def operate(flowProcess : FlowProcess[_],
functionCall : FunctionCall[Any]) {
try {
flowProcess.asInstanceOf[HadoopFlowProcess]
.increment(group, counter, 1L)
functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
} catch {
case cce: ClassCastException =>
// HadoopFlowProcess is not available in local mode
}
}.discard('addCounter)
}
)
}
然而,当我尝试时,我得到:
Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^
我错过了什么吗? 我用的烫版:0.8.7
.discard
是一个烫伤命令,因此应该与代码块中的另一个烫伤命令.each
处于同一级别。尝试将其放在最后一个右括号“)”之后。 (您发布的代码中的倒数第二行。)
在这里,操作链接到 RichPipe 管道,首先是 each
,然后是 discard
:
pipe.each(...){predicate}.discard(...)