Scalding TypedPipe API 外部操作模式

Scalding TypedPipe API External Operations pattern

我有一本 Antonios Chalkiopoulos 的 Programming MapReduce with Scalding。在书中,他讨论了 Scalding 代码的外部操作设计模式。你可以在他的网站 here. I have made a choice to use the Type Safe API 上看到一个例子。自然地,这会带来新的挑战,但我更喜欢它而不是字段 API,这是我之前提到的书中和网站中大量讨论的内容。

我想知道人们是如何使用类型安全 API 实现外部操作模式的。我的初步实现如下:

I create a class that extends com.twitter.scalding.Job which will serve as my Scalding job class where I will 'manage arguments, define taps, and use external operations to construct data processing pipelines'.

I create an object where I define my functions to be used in the Type Safe pipes. Because the Type Safe pipes take as arguments a function, I can then just pass the functions in the object as arguments to the pipes.

这将创建如下所示的代码:

class MyJob(args: Args) extends Job(args) {

  import MyOperations._

  val input_path = args(MyJob.inputArgPath)
  val output_path = args(MyJob.outputArgPath)

  val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
    case _ => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
  }

  val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => WritableSequenceFile[LongWritable, Text](output_path)
    case _ => TypedTsv[(LongWritable, Text)](output_path)
  }

  val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.map(convertTextToEither).fork
  validatedEvents.filter(isEvent).map(removeEitherWrapper).write(eventOutput)
}

object MyOperations {

  def convertTextToEither(v: (LongWritable, Text)): (LongWritable, Either[Text, Event]) = {
    ...
  }

  def isEvent(v: (LongWritable, Either[Text, Event])): Boolean = {
    ...
  }

  def removeEitherWrapper(v: (LongWritable, Either[Text, Event])): (LongWritable, Text) = {
    ...
  }
}

如您所见,传递给 Scalding 类型安全操作的函数与作业本身保持分离。虽然这不像所呈现的外部操作模式那样 'clean',但这是编写此类代码的快速方法。此外,我可以使用 JUnitRunner 进行作业级集成测试,使用 ScalaTest 进行功能级单元测试。

这个 post 的主要目的是询问人们是如何做这种事情的? Internet 上有关 Scalding Type Safe API 的文档很少。是否有更多功能性 Scala 友好的方法来执行此操作?我在这里缺少设计模式的关键组件吗?我对此感到有点紧张,因为有了 Fields API,您可以使用 ScaldingTest 在管道上编写单元测试。据我所知,您不能使用 TypedPipes 做到这一点。如果 Scalding 类型安全 API 或您如何创建可重用、模块化和可测试的类型安全 API 代码是否有普遍认可的模式,请告诉我。感谢您的帮助!

Antonios 回复后更新 2

感谢您的回复。这基本上就是我一直在寻找的答案。我想继续谈话。正如我评论的那样,我在您的回答中看到的主要问题是此实现需要特定类型的实现,但是如果类型在您的整个工作中发生变化怎么办?我研究了这段代码,它似乎可以工作,但似乎被黑了。

def self: TypedPipe[Any]

def testingPipe: TypedPipe[(LongWritable, Text)] = self.map(
    (firstVar: Any) => {
        val tester = firstVar.asInstanceOf[(LongWritable, Text)]
        (tester._1, tester._2)
    }
)

这样做的好处是我声明了一个 self 的实现,但缺点是这种丑陋的类型转换。此外,我还没有使用更复杂的管道对此进行深入测试。所以基本上,您对如何处理类型有何看法,因为它们仅通过 cleanliness/brevity 的一个自我实现发生变化?

我不确定您看到的代码片段有什么问题,以及为什么您认为它是 "less clean"。我觉得还不错。

至于单元测试作业使用类型 API 问题,看看 JobTest,它似乎正是你要找的。

Scala extension methods 是使用隐式 classes 实现的。 您向编译器添加了将 TypedPipe 转换为包含外部操作的(包装器)class 的功能:

import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import cascading.flow.FlowDef

class MyJob(args: Args) extends Job(args) {

  implicit class MyOperationsWrapper(val self: TypedPipe[Double]) extends MyOperations with Serializable

  val pipe = TypedPipe.from(TypedTsv[Double](args("input")))

  val result = pipe
    .operation1
    .operation2(x => x*2)
    .write(TypedTsv[Double](args("output")))

}

trait MyOperations {

  def self: TypedPipe[Double]

  def operation1(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x =>
      println(s"Input: $x")
      x / 100
    }

  def operation2(datafn:Double => Double)(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x=>
      val result = datafn(x)
      println(s"Result: $result")
      result
    }

}

import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration

object MyRunner extends App {

  ToolRunner.run(new Configuration(), new Tool, (classOf[MyJob].getName :: "--local" ::
    "--input" :: "doubles.tsv" ::
    "--output":: "result.tsv" :: args.toList).toArray)

}

关于如何跨管道管理类型,我的建议是尝试找出一些有意义和用例 classes 的基本类型。要使用您的示例,我会将方法 convertTextToEither 重命名为 extractEvents :

case class LogInput(l : Long, text: Text)
case class Event(data: String)
def extractEvents( line : LogInput ): TypedPipe[Event] =
  self.filter( isEvent(line) )
      .map ( getEvent(line.text) ) 

那么你会

  • LogInputOperations LogInput 类型
  • EventOperations 对于 Event 类型