使用 R 目标将新数据附加到现有数据
Using R Targets to Append New Data to Exisiting Data
我正在使用 targets
工作流程。此管道的一部分是监视 csv 文件目录的更新。此目录中有超过 10,000 个 csv 文件,并且每周添加新文件。我希望能够识别新添加的文件并将它们附加到现有的 *.rds
文件集。最简单的方法是每周重新 运行 创建 *.rds
文件的 5 个子集的过程,但这需要时间。有效的方法是识别新添加的文件,并简单地 bind_rows
使用正确的 rds
文件。
我可以通过使用 dir()
和 setdiff()
的典型编程轻松地做到这一点,我在其中存储了前一天的 csv 文件路径快照。但我正在努力在 targets
框架内实现这一目标。
这是一个似乎不起作用的尝试。我想我想监视 /_targets
目录中的临时结果,但我不确定该怎么做。而且,targets
文档建议不要在目标配置本身内部使用 tar_load
。
tar_script({
list(
tar_target(csv_directory, "/csv/"),
tar_target(csv_snapshot, dir(csv_directory)),
tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
...}
})
一些可能有用的组件:
- 文件目标:https://books.ropensci.org/targets/files.html。使用
tar_target(format = "file")
,程序包监视输入 and/or 输出文件的更改并重新运行受影响的目标(如果有)。
- 替代存储格式:https://docs.ropensci.org/targets/reference/tar_target.html#storage-formats。与其将您的 CSV 文件聚合到外部 RDS 文件中,不如使用
tar_target(format = "feather")
之类的东西更有效,因此 targets
会自动压缩您的输出数据并确保您不必担心文件的微观管理。
- 动态分支:books.ropensci.org/targets/dynamic.html。动态分支是一种在管道 运行 时定义大量新目标集合的方法。例如,这使您可以为一个文件或一批现有文件创建一个新目标。
- 批处理:https://books.ropensci.org/targets/dynamic.html#batching。 10000 个目标很多,
targets
程序包可以放慢速度,因为每个目标都有间接成本。
因此,我建议您将 CSV 文件组织成批次(例如,按周),并动态地对批次进行分支以处理它们。另一种批处理结构可能更合适,具体取决于您的用例的具体情况。
csv/
├── week1/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
├── week2/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
...
管线草图:
# _targets.R
process_csv_dir <- function(csv_dir) {...} # custom user-defined function
list(
tar_target(csv_dir, list.files("csv", full.names = TRUE)),
tar_target(
processed_data,
process_csv_dir(csv_dir),
pattern = map(csv_dir), # dynamic branching
format = "feather" # from the arrow package
)
)
我正在使用 targets
工作流程。此管道的一部分是监视 csv 文件目录的更新。此目录中有超过 10,000 个 csv 文件,并且每周添加新文件。我希望能够识别新添加的文件并将它们附加到现有的 *.rds
文件集。最简单的方法是每周重新 运行 创建 *.rds
文件的 5 个子集的过程,但这需要时间。有效的方法是识别新添加的文件,并简单地 bind_rows
使用正确的 rds
文件。
我可以通过使用 dir()
和 setdiff()
的典型编程轻松地做到这一点,我在其中存储了前一天的 csv 文件路径快照。但我正在努力在 targets
框架内实现这一目标。
这是一个似乎不起作用的尝试。我想我想监视 /_targets
目录中的临时结果,但我不确定该怎么做。而且,targets
文档建议不要在目标配置本身内部使用 tar_load
。
tar_script({
list(
tar_target(csv_directory, "/csv/"),
tar_target(csv_snapshot, dir(csv_directory)),
tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
...}
})
一些可能有用的组件:
- 文件目标:https://books.ropensci.org/targets/files.html。使用
tar_target(format = "file")
,程序包监视输入 and/or 输出文件的更改并重新运行受影响的目标(如果有)。 - 替代存储格式:https://docs.ropensci.org/targets/reference/tar_target.html#storage-formats。与其将您的 CSV 文件聚合到外部 RDS 文件中,不如使用
tar_target(format = "feather")
之类的东西更有效,因此targets
会自动压缩您的输出数据并确保您不必担心文件的微观管理。 - 动态分支:books.ropensci.org/targets/dynamic.html。动态分支是一种在管道 运行 时定义大量新目标集合的方法。例如,这使您可以为一个文件或一批现有文件创建一个新目标。
- 批处理:https://books.ropensci.org/targets/dynamic.html#batching。 10000 个目标很多,
targets
程序包可以放慢速度,因为每个目标都有间接成本。
因此,我建议您将 CSV 文件组织成批次(例如,按周),并动态地对批次进行分支以处理它们。另一种批处理结构可能更合适,具体取决于您的用例的具体情况。
csv/
├── week1/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
├── week2/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
...
管线草图:
# _targets.R
process_csv_dir <- function(csv_dir) {...} # custom user-defined function
list(
tar_target(csv_dir, list.files("csv", full.names = TRUE)),
tar_target(
processed_data,
process_csv_dir(csv_dir),
pattern = map(csv_dir), # dynamic branching
format = "feather" # from the arrow package
)
)