mutable.Buffer 不适用于类型安全的 Scalding JobTest API

mutable.Buffer does not work with Scalding JobTest for Type Safe API

我几乎完成了我的 Scalding 项目,该项目使用类型安全 API 而不是字段 API。在整个项目设置中留给我的最后一个问题是整个 Scalding 作业本身的集成测试(我已经完成了类型安全外部操作模式的单元测试耶!)。这意味着 运行 完成作业并测试我作业的各种接收器的输出。

然而,一些非常奇怪的事情正在发生。在我的

typedSink { scala.collection.mutable.Buffer[] => Unit }

似乎我的程序没有看到缓冲区或对缓冲区做任何事情,所以集成测试总是通过,即使它不应该通过。下面是工作本身和有助于阐明正在发生的事情的测试:

object MyJob {
  val inputArgPath = "input"
  val validOutputArgPath = "validOutput"
  val invalidOutputArgPath = "invalidOutput"
}

class MyJob(args: Args) extends Job(args) {

  import OperationWrappers._

  implicit lazy val uId: Some[UniqueID] = Some(UniqueID.getIDFor(flowDef))

  val inputPath: String = args(MyJob.inputArgPath)
  val validOutputPath: String = args(MyJob.validOutputArgPath)
  val invalidOutputPath: String = args(MyJob.invalidOutputArgPath)

  val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](inputPath))
    case _ => TypedPipe.from(TypedTsv[(LongWritable, Text)](inputPath))
  }

  def returnOutputPipe(outputString: String): FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = {
    val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
      case m: HadoopMode => WritableSequenceFile[LongWritable, Text](outputString)
      case _ => TypedTsv[(LongWritable, Text)](outputString)
    }
    eventOutput
  }

  val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.convertJsonToEither.forceToDisk

  validatedEvents.removeInvalidTuples.removeEitherWrapper.write(returnOutputPipe(invalidOutputPath))
  validatedEvents.keepValidTuples.removeEitherWrapper.write(returnOutputPipe(validOutputPath))

  override protected def handleStats(statsData: CascadingStats) = {
    //This is code to handle counters.
  }
}

下面是集成测试:

class MyJobTest extends FlatSpec with Matchers {
  private val LOG = LoggerFactory.getLogger(classOf[MyJobTest])

  val validEvents: List[(LongWritable, Text)] = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/validEvents.txt")).getLines().toList.map(s => {
    val eventText = new Text
    val typedFields = s.split(Constants.TAB)
    eventText.set(typedFields(1))
    (new LongWritable(typedFields(0).toLong), eventText)
  })

  "Integrate-Test: My Job" should "run test" in {

    LOG.info("Before Job Test starts.")
    JobTest(classOf[MyJob].getName)
      .arg(MyJob.inputArgPath, "input")
      .arg(MyJob.invalidOutputArgPath, "invalidOutput")
      .arg(MyJob.validOutputArgPath, "validOutput")
      .source(TypedTsv[(LongWritable, Text)]("input"), validEvents)
      .typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("invalidOutput")) {
      (buffer: mutable.Buffer[(LongWritable, Text)]) => {
        LOG.info("This is inside the buffer1.")
        buffer.size should equal(1000000)
      }
    }
      .typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("validOutput")) {
      (buffer: mutable.Buffer[(LongWritable, Text)]) => {
        LOG.info("This is inside the buffer2.")
        buffer.size should equal(1000000000)
      }
    }
      .run
      .finish
  }
}

最后,输出:

[INFO] --- maven-surefire-plugin:2.7:test (default-test) @ MyJob ---
[INFO] Tests are skipped.
[INFO]
[INFO] --- scalatest-maven-plugin:1.0:test (test) @ MyJob ---
Discovery starting.
16/01/28 10:06:42 INFO jobs.MyJobTest: Before Job Test starts.
16/01/28 10:06:42 INFO property.AppProps: using app.id: A98C9B84C79348F8A7784D8247410C13
16/01/28 10:06:42 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  source: MemoryTap["NullScheme"]["0.2996348736498404"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  sink: MemoryTap["NullScheme"]["0.8393418014297485"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  sink: MemoryTap["NullScheme"]["0.20643450953780684"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  parallel execution is enabled: true
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  starting jobs: 1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  allocating threads: 1
16/01/28 10:06:42 INFO flow.FlowStep: [com.myCompany.myProject.c...] starting step: local
16/01/28 10:06:42 INFO util.Version: HV000001: Hibernate Validator 5.0.3.Final
Dumping custom counters:
rawEvent    6
validEvent  6
16/01/28 10:06:42 INFO jobs.MyJob: RawEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: ValidEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: InvalidEvents: 0
16/01/28 10:06:42 INFO jobs.MyJob: Job has valid counters and is exiting successfully.

如您所见,Logger 记录了 "Before Job Test Starts" 但 typedSink 部分内部没有任何反应。这令人沮丧,因为我的代码看起来像我看到的所有其他代码,但它不起作用。它应该无法通过测试,但一切 运行 都成功了。此外,typedSink 中的 Logger 永远不会输出。最后,如果您查看输出,您会发现它正确处理了计数器,因此 运行 正在完成作业。我花了很多时间尝试新事物,但似乎没有任何效果。希望社区能够帮助我。谢谢!

所以,虽然我对此 post 没有很好的答案,但我有适合我的答案。基本上我的问题是我正在使用 ScalaTest 运行 我的 Scalding 工作来自这个 link:Using the ScalaTest Maven plugin。这对于我的操作单元测试来说效果很好,但是在将 ScalaTest 与 JobTest 一起使用时会导致奇怪。在与 Scalding 开发人员交谈并最终承认我的团队使用 JUnitRunner 取得成功后,我决定采用它。我更改了我的 POM 以支持 JUnitRunner 并向我的测试添加了 @RunWith(classOf[JUnitRunner]) 注释。一切正常,表现得也像我想要的那样。