使用 readr::read_csv_chunked() 分块读取 csv 文件

Reading csv files in chunks with `readr::read_csv_chunked()`

我想读取更大的 csv 文件,但 运行 遇到内存问题。因此,我想尝试使用 readr 包中的 read_csv_chunked() 分块阅读它们。我的问题是我不太理解 callback 论点。

这是我迄今为止尝试过的最小示例(我知道我必须将所需的操作包含到 f() 中,否则在内存使用方面不会有优势,对吧? ):

library(tidyverse)
data(diamonds)
write_csv(diamonds, "diamonds.csv") # to have a csv to read

f <- function(x) {x}
diamonds_chunked <- read_csv_chunked("diamonds.csv", 
                                     callback = DataFrameCallback$new(f),
                                     chunk_size = 10000)

我试图使 callback 参数接近官方文档中的示例:

# Cars with 3 gears
f <- function(x, pos) subset(x, gear == 3)
read_csv_chunked(readr_example("mtcars.csv"), 
                 DataFrameCallback$new(f), 
                 chunk_size = 5)

但是,我收到以下错误,似乎是在读取第一个块后出现的,因为我看到进度条移动到 18%。

Error in eval(substitute(expr), envir, enclos) : unused argument (index)

我已经尝试在 f() 中包含我想要进行的操作,但我仍然遇到同样的错误。

我发现要在 DataFrameCallback$new() 中调用的函数总是需要有一个额外的参数(文档示例中的 pos)。不必使用此参数,因此我并不真正了解其目的。但至少,它是这样工作的。

有人知道关于第二个参数的更多细节吗?

pos表示位置,它是每个块中第一行的索引号。使用此回调函数,您可以处理块中的每一行。

下面是来自官方的例子 https://readr.tidyverse.org/reference/callback.html

ChunkCallback Callback interface definition, all callback functions should inherit from this class.

SideEffectChunkCallback Callback function that is used only for side effects, no results are returned.

DataFrameCallback Callback function that combines each result together at the end.

AccumulateCallBack Callback function that accumulates a single result. Requires the parameter acc to specify the initial value of the accumulator. The parameter acc is NULL by default.

# Print starting line of each chunk
f <- function(x, pos) print(pos)
read_lines_chunked(readr_example("mtcars.csv"), SideEffectChunkCallback$new(f), chunk_size = 5)

# The ListCallback can be used for more flexible output
f <- function(x, pos) x$mpg[x$hp > 100]
read_csv_chunked(readr_example("mtcars.csv"), ListCallback$new(f), chunk_size = 5)