Akka Stream:无法处理来自测试的文件,在 Main class 中工作正常

Akka Stream: Could not process file from Test, works fine from Main class

这就是我的代码的样子

object LogFile {
  def apply(file: File, system: ActorSystem) = new LogFile(file, system)
  val maxBytesPerLine = 1500

  def main(args: Array[String]) {
    val system = ActorSystem("system")
    LogFile(new File("/Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log"), system).process()
  }
}

class LogFile(file: File, implicit val system: ActorSystem) {
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] = Source.synchronousFile(file)

  // todo (harit): what should be maximumFrameLength
  val flow: Flow[ByteString, String, Unit] = Flow[ByteString]
    .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = LogFile.maxBytesPerLine, allowTruncation = true))
    .map(_.utf8String)

  def process() = {
    logger.debug(s"processing $file")
    source.via(flow).runForeach(println)
  }
}

当我 运行 这个时,我得到输出

16:07:58.062 [main] DEBUG com.learner.processor.LogFile - processing /Users/harit/workspace/LogProcessor/trunk/test-log-files/bc.envision.0001.log
Nov 13 14:50:29 [151.162.175.11] %CACHEFLOWELFF-4-: date="2013-11-13",time="19:49:06",time-taken="1",c-ip="151.162.175.2",s-action="FAILED",s-ip="151.162.175.11",s-hierarchy="-",s-supplier-name="-",s-sitename="SG-SOCKS-Service",cs-user="-",cs-username="-",cs-auth-group="-",cs-categories="unavailable;unavailable;unavailable;unavailable;unavailable",cs-method="unknown",cs-host="-",cs-uri="-",cs-uri-scheme="-",cs-uri-port="0",cs-uri-path="/",cs-uri-query="-",cs-uri-extension="-",cs(Referer)="-",cs(User-Agent)="-",cs-bytes="0",sc-status="0",sc-bytes="0",sc-filter-result="DENIED",sc-filter-category="unavailable",x-virus-id="-",x-exception-id="internal_error",rs(Content-Type)="-",duration="0",s-supplier-ip="-",cs(Cookie)="-",s-computername="ATLTCP-WP01",s-port="1080",cs-uri-stem="-",cs-version="-",
Nov 13 14:50:29 [151.162.175.11] %CACHEFLOWELFF-4-: date="2013-11-13",time="19:49:06",time-taken="6",c-ip="170.227.133.15",s-action="TCP_NC_MISS",s-ip="151.162.175.11",s-hierarchy="-",s-supplier-name="s.youtube.com",s-sitename="SG-HTTP-Service",cs-user="NA\os3281",cs-username="os3281",cs-auth-group="na\SEC-InternetAccess",cs-categories="Mixed Content/Potentially Adult;Audio/Video Clips",cs-method="GET",cs-host="s.youtube.com",cs-uri="http://s.youtube.com/stream_204?el=detailpage&cpn=5IWD86_4oayP2E4P&scoville=1&event=streamingstats&vps=652.509:PL&docid=KFwjibi-JRU&df=652.509:474&fmt=134&ns=yt&ei=_NSDUuDMFMXc6QaTnYCwDA",cs-uri-scheme="http",cs-uri-port="80",cs-uri-path="/stream_204",cs-uri-query="?el=detailpage&cpn=5IWD86_4oayP2E4P&scoville=1&event=streamingstats&vps=652.509:PL&docid=KFwjibi-JRU&df=652.509:474&fmt=134&ns=yt&ei=_NSDUuDMFMXc6QaTnYCwDA",cs-uri-extension="-",cs(Referer)="http://s.ytimg.com/yts/swfbin/player-vfle2Qpz1/watch_as3.swf",cs(User-Agent)="Mozilla/5.0 (Windows NT 5.1; rv:17.0) Gecko/20100101 Firefox/17.0",cs-bytes="1374",sc-status="204",sc-bytes="400",sc-filter-result="OBSERVED",sc-filter-category="Mixed%20Content/Potentially%20Adult",x-virus-id="-",x-exception-id="-",rs(Content-Type)="text/html;%20charset=UTF-8",duration="0",s-supplier-ip="74.125.196.101",cs(Cookie)="dkv=074ff5491b2ac93d89643b66573b1028e3QEAAAAdGxpcGkl1YNSMA==",s-computername="ATLTCP-WP01",s-port="880",cs-uri-stem="http://s.youtube.com/stream_204",cs-version="HTTP/1.0",

现在我为它编写测试

class LogFileTest extends UnitTestKit(ActorSystem("logFileTestSystem")) {
  behavior of "LogFile"

  "source with some log lines" should "complete processing successfully" in {

    val bcFile = new File("/Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log")
    val logFile: LogFile = LogFile(bcFile, system)
    logFile.process()
  }
}

class UnitTestKit(_system: ActorSystem) extends TestKit(_system)
with FlatSpecLike
with BeforeAndAfterAll
with ShouldMatchers {
  override protected def afterAll(): Unit = _system.shutdown()
}

当我运行这个测试的时候,我看到

16:10:14.849 [ScalaTest-run-running-LogFileTest] DEBUG com.learner.processor.LogFile - processing /Users/harit/Downloads/customer issues/bc/bluecoat_proxy_big/Demo_log_004.log

Process finished with exit code 0

没有生成输出,我错过了什么?

你的问题是 runForeach returns a Future[Unit] 而你的测试没有等待它,所以测试在进程开始后立即终止并且实际上没有计算 运行。在 ScalaTest ScalaFutures 特征中使用 Await.resultwhenReady 实用程序来使用 logFile.process() 调用的结果应该可以解决它。