运行 从数据库加载大量记录时内存不足
Running out of memory when loading large amount of records from database
我在 Akka Streams 中使用 slick 从数据库 (postgresql) 加载大量记录 (~2M) 并将它们写入 S3 文件。但是,我注意到我的下面代码适用于大约 50k 左右的记录,但对于超过 100k 标记的任何记录都失败。
val allResults: Future[Seq[MyEntityImpl]] =
MyRepository.getAllRecordss()
val results: Future[MultipartUploadResult] = Source
.fromFuture(allResults)
.map(seek => seek.toList)
.mapConcat(identity)
.map(myEntity => myEntity.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
下面是 myEntity
的示例:
case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
我正在寻找一种以更具反应性的方式执行此操作的方法,这样它就不会 运行 内存不足。
潜在问题
问题是您在将所有记录从数据库中提取到本地内存中,然后再将它们分派到 s3Sink
。
第一个将数据拉入内存的地方可能在您的 MyRepository.getAllRecords()
方法中。大多数(如果不是全部)Seq
实现都是基于内存的。第二个肯定使用本地内存的地方是 seek.toList
,因为 List
将所有数据存储在内存中。
解决方案
而不是从 getAllRecords
返回 Seq
。这将确保您的物化流在转到 s3 之前只需要用于瞬态处理步骤的内存。
如果您的方法定义更改为:
def getAllRecords() : Source[MyEntityImpl, _]
然后流的其余部分将以反应方式运行:
MyRepository
.getAllRecords()
.map(myEntity => myEntity.toPSV + "\n")
.map(ByteString.apply)
.runWith(s3Sink)
我在 Akka Streams 中使用 slick 从数据库 (postgresql) 加载大量记录 (~2M) 并将它们写入 S3 文件。但是,我注意到我的下面代码适用于大约 50k 左右的记录,但对于超过 100k 标记的任何记录都失败。
val allResults: Future[Seq[MyEntityImpl]] =
MyRepository.getAllRecordss()
val results: Future[MultipartUploadResult] = Source
.fromFuture(allResults)
.map(seek => seek.toList)
.mapConcat(identity)
.map(myEntity => myEntity.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
下面是 myEntity
的示例:
case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
我正在寻找一种以更具反应性的方式执行此操作的方法,这样它就不会 运行 内存不足。
潜在问题
问题是您在将所有记录从数据库中提取到本地内存中,然后再将它们分派到 s3Sink
。
第一个将数据拉入内存的地方可能在您的 MyRepository.getAllRecords()
方法中。大多数(如果不是全部)Seq
实现都是基于内存的。第二个肯定使用本地内存的地方是 seek.toList
,因为 List
将所有数据存储在内存中。
解决方案
而不是从 getAllRecords
Seq
。这将确保您的物化流在转到 s3 之前只需要用于瞬态处理步骤的内存。
如果您的方法定义更改为:
def getAllRecords() : Source[MyEntityImpl, _]
然后流的其余部分将以反应方式运行:
MyRepository
.getAllRecords()
.map(myEntity => myEntity.toPSV + "\n")
.map(ByteString.apply)
.runWith(s3Sink)