使用 scala fs2 文件流从文件中删除过滤行
remove the filtered line from the file using scala fs2 file streaming
如何使用 fs2 从当前流文件中删除 filtered
行并获取 return 类型的过滤行数?
例如:如果 old.txt
包含由换行符 (\n) 分隔的字符串:
john
sam
chen
yval
....
和val myList = List("chen","yval")
.
def converter[F[_]](implicit F: Sync[F]): F[Unit] =
io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => myList.contains(s))//remove this from the old file and write to new file
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/new.txt")))
.compile.drain
// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
可以使用observe method的Stream
class。
您正在寻找一个函数def converter[F[_]: Sync]: F[Int]
,
它产生一个计算 F[Int]
,其结果(Int
类型)是过滤行的数量,其作用是将这些行写入输出文件。按照管道类比,您希望将过滤后的流 馈送 到两个输出,一个用于结果,一个用于效果。
您可以使用定义为
的函数 observe 来执行此操作
def observe(sink: Sink[F, O])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O]
A Sink[F,O]
是函数 Stream[F, O] => Stream[F, Unit]
的别名。在您的情况下,sink 是将过滤后的流写入输出文件的代码的一部分:
def writeToFile[F[_]: Sync]: Sink[F, String] =
_.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/new.txt")))
另一个输出是减少,或者说折叠,
def converter[F[_]](implicit F: Effect[F], ec: ExecutionContext): F[Int] =
io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => myList.contains(s))
.observe(writeToFile[F])
.compile
.fold[Int](0)( (c, _) => c+1)
}
注意:对于此解决方案,您需要将F
的类型class限制为Effect
,并且您需要使用 ExecutionContext
。 fold
定义在 ToEffect
class 中。
如何使用 fs2 从当前流文件中删除 filtered
行并获取 return 类型的过滤行数?
例如:如果 old.txt
包含由换行符 (\n) 分隔的字符串:
john
sam
chen
yval
....
和val myList = List("chen","yval")
.
def converter[F[_]](implicit F: Sync[F]): F[Unit] =
io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => myList.contains(s))//remove this from the old file and write to new file
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/new.txt")))
.compile.drain
// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
可以使用observe method的Stream
class。
您正在寻找一个函数def converter[F[_]: Sync]: F[Int]
,
它产生一个计算 F[Int]
,其结果(Int
类型)是过滤行的数量,其作用是将这些行写入输出文件。按照管道类比,您希望将过滤后的流 馈送 到两个输出,一个用于结果,一个用于效果。
您可以使用定义为
def observe(sink: Sink[F, O])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O]
A Sink[F,O]
是函数 Stream[F, O] => Stream[F, Unit]
的别名。在您的情况下,sink 是将过滤后的流写入输出文件的代码的一部分:
def writeToFile[F[_]: Sync]: Sink[F, String] =
_.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/new.txt")))
另一个输出是减少,或者说折叠,
def converter[F[_]](implicit F: Effect[F], ec: ExecutionContext): F[Int] =
io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => myList.contains(s))
.observe(writeToFile[F])
.compile
.fold[Int](0)( (c, _) => c+1)
}
注意:对于此解决方案,您需要将F
的类型class限制为Effect
,并且您需要使用 ExecutionContext
。 fold
定义在 ToEffect
class 中。