Scala 中的惰性分页(Stream/Iterator 个迭代器?)
Lazy Pagination in Scala (Stream/Iterator of Iterators?)
我通过调用 def readPage(pageNumber: Int): Iterator[Record]
一次一页地从数据库 API 中顺序读取大量记录(每页记录数未知)
我正在尝试将这个 API 懒惰地包装在 Stream[Iterator[Record]]
或 Iterator[Iterator[Record]]
之类的东西中,以一种功能性的方式,理想情况下没有可变状态,具有恒定的内存占用,所以我可以将其视为无限的页面流或迭代器序列,并从客户端抽象出分页。客户端可以迭代结果,通过调用 next() 它将检索下一页 (Iterator[Record])。
在 Scala 中实现它的最惯用和最有效的方法是什么。
编辑:需要一次一页地获取和处理记录,无法在内存中维护所有页面的所有记录。如果一页失败,则抛出异常。大量的 pages/records 意味着所有实际目的都是无限的。我想将其视为页面的无限流(或迭代器),每个页面都是有限数量记录的迭代器(例如,小于 <1000,但如果时间提前,确切数量未知)。
我在 Monix 中查看了 BatchCursor,但它有不同的用途。
编辑 2:这是使用下面 Tomer 的 作为起点的当前版本,但使用 Stream 而不是 Iterator。
这允许根据 消除尾递归的需要,并且流前置 #::
操作有 O(1) 时间(而如果我们通过 ++
操作连接迭代器,它将是 O(n))
注意:虽然流是惰性求值的,Stream memoization may still cause memory blow up, and memory management gets tricky。从val
改为def
来定义下面def pages = readAllPages
中的Stream似乎没有任何效果
def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
val iter: Iterator[Record] = readPage(pageNumber)
if (iter.isEmpty)
Stream.empty
else
iter #:: readAllPages(pageNumber + 1)
}
//usage
val pages = readAllPages
for{
page<-pages
record<-page
if(isValid(record))
}
process(record)
编辑 3:
Tomer 的第二个建议似乎是最好的,其运行时和内存占用与上述解决方案类似,但更简洁且容易出错。
val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
注意:Stream.from(1)
创建一个从1开始递增1的流,它在API docs
你可以尝试实现这样的逻辑:
def readPage(pageNumber: Int): Iterator[Record] = ???
@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
val iter = readPage(pageNumber)
if (iter.nonEmpty) {
// Compute on records
// When finishing computing:
Iterator(iter) ++ readAllPages(pageNumber + 1)
} else {
Iterator.empty
}
}
readAllPages(0)
一个较短的选项是:
Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
我通过调用 def readPage(pageNumber: Int): Iterator[Record]
我正在尝试将这个 API 懒惰地包装在 Stream[Iterator[Record]]
或 Iterator[Iterator[Record]]
之类的东西中,以一种功能性的方式,理想情况下没有可变状态,具有恒定的内存占用,所以我可以将其视为无限的页面流或迭代器序列,并从客户端抽象出分页。客户端可以迭代结果,通过调用 next() 它将检索下一页 (Iterator[Record])。
在 Scala 中实现它的最惯用和最有效的方法是什么。
编辑:需要一次一页地获取和处理记录,无法在内存中维护所有页面的所有记录。如果一页失败,则抛出异常。大量的 pages/records 意味着所有实际目的都是无限的。我想将其视为页面的无限流(或迭代器),每个页面都是有限数量记录的迭代器(例如,小于 <1000,但如果时间提前,确切数量未知)。
我在 Monix 中查看了 BatchCursor,但它有不同的用途。
编辑 2:这是使用下面 Tomer 的 #::
操作有 O(1) 时间(而如果我们通过 ++
操作连接迭代器,它将是 O(n))
注意:虽然流是惰性求值的,Stream memoization may still cause memory blow up, and memory management gets tricky。从val
改为def
来定义下面def pages = readAllPages
中的Stream似乎没有任何效果
def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
val iter: Iterator[Record] = readPage(pageNumber)
if (iter.isEmpty)
Stream.empty
else
iter #:: readAllPages(pageNumber + 1)
}
//usage
val pages = readAllPages
for{
page<-pages
record<-page
if(isValid(record))
}
process(record)
编辑 3: Tomer 的第二个建议似乎是最好的,其运行时和内存占用与上述解决方案类似,但更简洁且容易出错。
val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
注意:Stream.from(1)
创建一个从1开始递增1的流,它在API docs
你可以尝试实现这样的逻辑:
def readPage(pageNumber: Int): Iterator[Record] = ???
@tailrec
def readAllPages(pageNumber: Int): Iterator[Iterator[Record]] = {
val iter = readPage(pageNumber)
if (iter.nonEmpty) {
// Compute on records
// When finishing computing:
Iterator(iter) ++ readAllPages(pageNumber + 1)
} else {
Iterator.empty
}
}
readAllPages(0)
一个较短的选项是:
Stream.from(1).map(readPage).takeWhile(_.nonEmpty)