流结束后如何关闭文件?
How can I close file after stream ending?
我有一个 java.io.File 的序列流。我使用 flatMapConcat
创建新的 Source
文件,类似于:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
有没有一种简单的方法可以在流结束后关闭每个文件? SomePublisher()
无法关闭。
因此,如果我理解正确的话,您可以执行以下操作:为每个文件创建数据库对象,然后打开文件。因此,由于您在代码中打开数据库连接,因此您有责任关闭它。由于您使用的是有限的文件列表,因此您可以按顺序存储所有数据库连接,运行 您的流并在流结束后关闭所有连接。
另一种方法是让您自己的发布者获取文件名、打开数据库连接、从中流式传输、关闭数据库连接。第二个选项将允许您从无限的文件列表中流式传输。
如果你想要我的代码片段,请给我你的函数的完整源代码,我会更新它。
所以我在很多方法中找到了一个很好的方法来解决我的问题,但如果你有另一种方法,我也希望看到它。
def someSource(file: File) = {
val f = openFile(file)
Source
.fromPublisher(SomePublisher(f))
.transform(() => new PushStage[?, ?] {
override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
override def postStop(): Unit = {
f.close()
super.postStop()
}
}
}
def func(files: List[File]) =
Source(files)
.flatMapConcat(someSource)
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
我有一个 java.io.File 的序列流。我使用 flatMapConcat
创建新的 Source
文件,类似于:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
有没有一种简单的方法可以在流结束后关闭每个文件? SomePublisher()
无法关闭。
因此,如果我理解正确的话,您可以执行以下操作:为每个文件创建数据库对象,然后打开文件。因此,由于您在代码中打开数据库连接,因此您有责任关闭它。由于您使用的是有限的文件列表,因此您可以按顺序存储所有数据库连接,运行 您的流并在流结束后关闭所有连接。
另一种方法是让您自己的发布者获取文件名、打开数据库连接、从中流式传输、关闭数据库连接。第二个选项将允许您从无限的文件列表中流式传输。
如果你想要我的代码片段,请给我你的函数的完整源代码,我会更新它。
所以我在很多方法中找到了一个很好的方法来解决我的问题,但如果你有另一种方法,我也希望看到它。
def someSource(file: File) = {
val f = openFile(file)
Source
.fromPublisher(SomePublisher(f))
.transform(() => new PushStage[?, ?] {
override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
override def postStop(): Unit = {
f.close()
super.postStop()
}
}
}
def func(files: List[File]) =
Source(files)
.flatMapConcat(someSource)
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)