使用 Scala Iterator 使用 RegEx 匹配将大流(从字符串)分解成块,然后对这些块进行操作?

Use Scala Iterator to break up large stream (from string) into chunks using a RegEx match, and then operate on those chunks?

我目前正在使用一种不太像 Scala 的方法来解析大型 Unix 邮箱文件。我仍在学习这门语言,并想挑战自己以找到更好的方法,但是,我不相信我对 Iterator 可以做什么以及如何有效地使用它有扎实的把握。

我目前正在使用 org.apache.james.mime4j,我使用org.apache.james.mime4j.mboxiterator.MboxIterator从文件中获取java.util.Iterator,如下所示:

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))

根据我的理解,Scala Iterator 更健壮,并且可能更能够处理类似这样的事情,特别是因为我并不总是能够将整个文件放入内存中。

我需要构建自己的 MboxIterator 版本。我仔细研究了 MboxIterator 的源代码,并找到了一个很好的 RegEx 模式来确定各个电子邮件的开头,但是,从现在开始我将空白。

我是这样创建正则表达式的:

 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);

我想做什么(基于我目前所知道的):

我觉得我应该可以使用 map()find()filter()collect() 来完成这个,但我被事实上,他们只给我 Int 工作。

我该如何完成?

编辑:

在对这个问题做了更多思考之后,我想到了另一种方式来描述我认为我需要做的事情:

  1. 我需要继续从流中读取,直到获得与我的正则表达式匹配的字符串

  2. 也许group之前读取的字节?

  3. 送去某处处理

  4. 以某种方式将它从范围中删除,这样下次我 运行 进入匹配时它就不会被分组

  5. 继续阅读流,直到找到下一个匹配项。

  6. 利润???

编辑 2:

我想我越来越接近了。使用这样的方法让我得到一个迭代器的迭代器。但是,有两个问题: 1. 这是不是在浪费内存?这是否意味着所有内容都被读入内存? 2. 我仍然需要找出一种方法将 拆分为 match,但仍将其包含在返回的迭代器中。

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  

如果我没理解错的话,您想懒惰地分块一个由正则表达式可识别模式分隔的大文件。

您可以尝试为每个请求 return Iterator,但正确的迭代器管理并非易事。

我倾向于对客户端隐藏所有文件和迭代器管理。

class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}

测试:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None

每个打开的文件只有一个 Iterator,并且只调用安全方法。文件文本仅在请求时实现(加载),如果可用,客户端将得到请求的内容。您可以 return 将每一行作为集合的一部分,而不是一个长 String 中的所有行,Seq[String],如果这样更适用的话。


更新:这可以修改以方便迭代。

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}

现在您可以 .foreach().map().flatMap() 等。但是您也可以做危险的事情,例如 .toList,这将加载整个文件。