如何在没有 `collect()` 的情况下使用 `dplyr` 将数据附加到 PostgreSQL table?
How can I append data to a PostgreSQL table with `dplyr` without `collect()`?
table reg_data
是一个 PostgreSQL table。事实证明,运行 PostgreSQL 中的回归速度更快。但是,由于我正在 运行 对 100,000 个数据集进行处理,所以我想按数据集处理数据集,并将每个数据集的结果附加到 table。
有没有办法使用本机 dplyr
动词将 PostgreSQL 数据附加到 PostgreSQL table?我不确定将数据带到 R 然后将它们发送回 PostgreSQL 是否会产生巨大的成本(它只是 6 个数字和几个标识字段),但它看起来确实不雅。
library(dplyr)
pg <- src_postgres()
reg_data <- tbl(pg, "reg_data")
reg_results <-
reg_data %>%
summarize(r_squared=regr_r2(y, x),
num_obs=regr_count(y, x),
constant=regr_intercept(y, x),
slope=regr_slope(y, x),
mean_analyst_fog=regr_avgx(y, x),
mean_manager_fog=regr_avgy(y, x)) %>%
collect() %>%
as.data.frame()
# Push to database.
dbWriteTable(pg$con, c("bgt", "within_call_data"), reg_results,
append=TRUE, row.names=FALSE)
dplyr
不包括在数据库中插入或更新记录的命令,因此没有完整的本机 dplyr
解决方案。但是您可以将 dplyr 与常规 SQL 语句结合使用,以避免将数据带到 R.
让我们从在 collect()
语句
之前重现您的步骤开始
library(dplyr)
pg <- src_postgres()
reg_data <- tbl(pg, "reg_data")
reg_results <-
reg_data %>%
summarize(r_squared=regr_r2(y, x),
num_obs=regr_count(y, x),
constant=regr_intercept(y, x),
slope=regr_slope(y, x),
mean_analyst_fog=regr_avgx(y, x),
mean_manager_fog=regr_avgy(y, x))
现在,您可以使用 compute()
而不是 collect()
在数据库中创建临时 table。
temp.table.name <- paste0(sample(letters, 10, replace = TRUE), collapse = "")
reg_results <- reg_results %>% compute(name=temp.table.name)
其中 temp.table.name
是随机的 table 名称。在计算中使用选项 name = temp.table.name
,我们将这个随机名称分配给创建的临时 table。
现在,我们将使用库 RPostgreSQL
创建一个使用存储在临时 table 中的结果的插入查询。由于临时 table 仅存在于 src_postgresql()
创建的连接中,我们需要重用它。
library(RPostgreSQL)
copyconn <- pg$con
class(copyconn) <- "PostgreSQLConnection" # I get an error if I don't fix the class
最后是插入查询
sql <- paste0("INSERT INTO destination_table SELECT * FROM ", temp.tbl.name,";")
dbSendQuery(copyconn, sql)
所以,一切都发生在数据库中,数据没有进入 R。
编辑
当我们从 reg_results
获得 temp.tbl.name
时,此 post 的先前版本确实破坏了封装。使用选项 name=
in compute.
可以避免这种情况
另一种选择是使用名为 sql_render()
的命令创建每个 SQL 语句,然后使用另一个名为 db_save_query()
的命令使用 table 创建 table =19=] 语句,然后是附加到 table 的手动语句。要遍历每个查询,使用 purrr
命令:map
和 walk
。优选地,像 compute()
命令这样的命令应该执行此操作,但取而代之的是,以下是一个完全可重现的示例:
library(dplyr)
library(dbplyr)
library(purrr)
# Setting up a SQLite db with 3 tables
con <- DBI::dbConnect(RSQLite::SQLite(), path = ":memory:")
copy_to(con, filter(mtcars, cyl == 4), "mtcars1")
copy_to(con, filter(mtcars, cyl == 6), "mtcars2")
copy_to(con, filter(mtcars, cyl == 8), "mtcars3")
# Pre-process the SQL statements
tables <- c("mtcars1","mtcars2","mtcars3")
all_results <- tables %>%
map(~{
tbl(con, .x) %>%
summarise(avg_mpg = mean(mpg),
records = n()) %>%
sql_render()
})
# Execute the SQL statements, 1st one creates the table
# subsquent queries are insterted to the table
first_query <- TRUE
all_results %>%
walk(~{
if(first_query == TRUE){
first_query <<- FALSE
db_save_query(con, ., "results")
} else {
dbExecute(con, build_sql("INSERT INTO results ", .))
}
})
tbl(con, "results")
dbDisconnect(con)
table reg_data
是一个 PostgreSQL table。事实证明,运行 PostgreSQL 中的回归速度更快。但是,由于我正在 运行 对 100,000 个数据集进行处理,所以我想按数据集处理数据集,并将每个数据集的结果附加到 table。
有没有办法使用本机 dplyr
动词将 PostgreSQL 数据附加到 PostgreSQL table?我不确定将数据带到 R 然后将它们发送回 PostgreSQL 是否会产生巨大的成本(它只是 6 个数字和几个标识字段),但它看起来确实不雅。
library(dplyr)
pg <- src_postgres()
reg_data <- tbl(pg, "reg_data")
reg_results <-
reg_data %>%
summarize(r_squared=regr_r2(y, x),
num_obs=regr_count(y, x),
constant=regr_intercept(y, x),
slope=regr_slope(y, x),
mean_analyst_fog=regr_avgx(y, x),
mean_manager_fog=regr_avgy(y, x)) %>%
collect() %>%
as.data.frame()
# Push to database.
dbWriteTable(pg$con, c("bgt", "within_call_data"), reg_results,
append=TRUE, row.names=FALSE)
dplyr
不包括在数据库中插入或更新记录的命令,因此没有完整的本机 dplyr
解决方案。但是您可以将 dplyr 与常规 SQL 语句结合使用,以避免将数据带到 R.
让我们从在 collect()
语句
library(dplyr)
pg <- src_postgres()
reg_data <- tbl(pg, "reg_data")
reg_results <-
reg_data %>%
summarize(r_squared=regr_r2(y, x),
num_obs=regr_count(y, x),
constant=regr_intercept(y, x),
slope=regr_slope(y, x),
mean_analyst_fog=regr_avgx(y, x),
mean_manager_fog=regr_avgy(y, x))
现在,您可以使用 compute()
而不是 collect()
在数据库中创建临时 table。
temp.table.name <- paste0(sample(letters, 10, replace = TRUE), collapse = "")
reg_results <- reg_results %>% compute(name=temp.table.name)
其中 temp.table.name
是随机的 table 名称。在计算中使用选项 name = temp.table.name
,我们将这个随机名称分配给创建的临时 table。
现在,我们将使用库 RPostgreSQL
创建一个使用存储在临时 table 中的结果的插入查询。由于临时 table 仅存在于 src_postgresql()
创建的连接中,我们需要重用它。
library(RPostgreSQL)
copyconn <- pg$con
class(copyconn) <- "PostgreSQLConnection" # I get an error if I don't fix the class
最后是插入查询
sql <- paste0("INSERT INTO destination_table SELECT * FROM ", temp.tbl.name,";")
dbSendQuery(copyconn, sql)
所以,一切都发生在数据库中,数据没有进入 R。
编辑
当我们从 reg_results
获得 temp.tbl.name
时,此 post 的先前版本确实破坏了封装。使用选项 name=
in compute.
另一种选择是使用名为 sql_render()
的命令创建每个 SQL 语句,然后使用另一个名为 db_save_query()
的命令使用 table 创建 table =19=] 语句,然后是附加到 table 的手动语句。要遍历每个查询,使用 purrr
命令:map
和 walk
。优选地,像 compute()
命令这样的命令应该执行此操作,但取而代之的是,以下是一个完全可重现的示例:
library(dplyr)
library(dbplyr)
library(purrr)
# Setting up a SQLite db with 3 tables
con <- DBI::dbConnect(RSQLite::SQLite(), path = ":memory:")
copy_to(con, filter(mtcars, cyl == 4), "mtcars1")
copy_to(con, filter(mtcars, cyl == 6), "mtcars2")
copy_to(con, filter(mtcars, cyl == 8), "mtcars3")
# Pre-process the SQL statements
tables <- c("mtcars1","mtcars2","mtcars3")
all_results <- tables %>%
map(~{
tbl(con, .x) %>%
summarise(avg_mpg = mean(mpg),
records = n()) %>%
sql_render()
})
# Execute the SQL statements, 1st one creates the table
# subsquent queries are insterted to the table
first_query <- TRUE
all_results %>%
walk(~{
if(first_query == TRUE){
first_query <<- FALSE
db_save_query(con, ., "results")
} else {
dbExecute(con, build_sql("INSERT INTO results ", .))
}
})
tbl(con, "results")
dbDisconnect(con)