如何加速 scalaz-stream 文本处理?
How can I speed up scalaz-stream text processing?
如何加速以下 scalaz-stream
代码?目前处理 70MB 的文本大约需要 5 分钟,所以我可能做错了什么,因为普通的 scala 等价物需要几秒钟。
(跟进 )
val converter2: Task[Unit] = {
val docSep = "~~~"
io.linesR("myInput.txt")
.flatMap(line => { val words = line.split(" ");
if (words.length==0 || words(0)!=docSep) Process(line)
else Process(docSep, words.tail.mkString(" ")) })
.split(_ == docSep)
.filter(_ != Vector())
.map(lines => lines.head + ": " + lines.tail.mkString(" "))
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("correctButSlowOutput.txt"))
.run
}
我认为您可以只使用一种 process1 分块方法来分块。如果您希望在将行合并为输出格式时进行大量并行处理,请确定有序输出是否重要,并使用与合并或发球台相结合的通道。这也将使它可重复使用。因为您正在执行非常少量的处理,所以您可能会被开销淹没,所以您必须更加努力地工作,以使您的工作单元足够大,不会被淹没。
以下内容基于@user1763729 的分块建议。虽然感觉很笨重,而且和原来的版本一样慢。
val converter: Task[Unit] = {
val docSep = "~~~"
io.linesR("myInput.txt")
.intersperse("\n") // handle empty documents (chunkBy has to switch from true to false)
.zipWithPrevious // chunkBy cuts only *after* the predicate turns false
.chunkBy{
case (Some(prev), line) => { val words = line.split(" "); words.length == 0 || words(0) != docSep }
case (None, line) => true }
.map(_.map(_._1.getOrElse(""))) // get previous element
.map(_.filter(!Set("", "\n").contains(_)))
.map(lines => lines.head.split(" ").tail.mkString(" ") + ": " + lines.tail.mkString(" "))
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("stillSlowOutput.txt"))
.run
}
编辑:
实际上,执行以下操作(仅读取文件,不写入或处理)已经需要 1.5 分钟,所以我想加快速度的希望不大。
val converter: Task[Unit] = {
io.linesR("myInput.txt")
.pipe(text.utf8Encode)
.run
}
如何加速以下 scalaz-stream
代码?目前处理 70MB 的文本大约需要 5 分钟,所以我可能做错了什么,因为普通的 scala 等价物需要几秒钟。
(跟进
val converter2: Task[Unit] = {
val docSep = "~~~"
io.linesR("myInput.txt")
.flatMap(line => { val words = line.split(" ");
if (words.length==0 || words(0)!=docSep) Process(line)
else Process(docSep, words.tail.mkString(" ")) })
.split(_ == docSep)
.filter(_ != Vector())
.map(lines => lines.head + ": " + lines.tail.mkString(" "))
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("correctButSlowOutput.txt"))
.run
}
我认为您可以只使用一种 process1 分块方法来分块。如果您希望在将行合并为输出格式时进行大量并行处理,请确定有序输出是否重要,并使用与合并或发球台相结合的通道。这也将使它可重复使用。因为您正在执行非常少量的处理,所以您可能会被开销淹没,所以您必须更加努力地工作,以使您的工作单元足够大,不会被淹没。
以下内容基于@user1763729 的分块建议。虽然感觉很笨重,而且和原来的版本一样慢。
val converter: Task[Unit] = {
val docSep = "~~~"
io.linesR("myInput.txt")
.intersperse("\n") // handle empty documents (chunkBy has to switch from true to false)
.zipWithPrevious // chunkBy cuts only *after* the predicate turns false
.chunkBy{
case (Some(prev), line) => { val words = line.split(" "); words.length == 0 || words(0) != docSep }
case (None, line) => true }
.map(_.map(_._1.getOrElse(""))) // get previous element
.map(_.filter(!Set("", "\n").contains(_)))
.map(lines => lines.head.split(" ").tail.mkString(" ") + ": " + lines.tail.mkString(" "))
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("stillSlowOutput.txt"))
.run
}
编辑:
实际上,执行以下操作(仅读取文件,不写入或处理)已经需要 1.5 分钟,所以我想加快速度的希望不大。
val converter: Task[Unit] = {
io.linesR("myInput.txt")
.pipe(text.utf8Encode)
.run
}