如何在没有 `collect()` 的情况下使用 `dplyr` 将数据附加到 PostgreSQL table?

How can I append data to a PostgreSQL table with `dplyr` without `collect()`?

table reg_data 是一个 PostgreSQL table。事实证明,运行 PostgreSQL 中的回归速度更快。但是,由于我正在 运行 对 100,000 个数据集进行处理,所以我想按数据集处理数据集,并将每个数据集的结果附加到 table。

有没有办法使用本机 dplyr 动词将 PostgreSQL 数据附加到 PostgreSQL table?我不确定将数据带到 R 然后将它们发送回 PostgreSQL 是否会产生巨大的成本(它只是 6 个数字和几个标识字段),但它看起来确实不雅。

library(dplyr)

pg <- src_postgres()

reg_data <- tbl(pg, "reg_data")

reg_results <-
    reg_data %>%
    summarize(r_squared=regr_r2(y, x),
              num_obs=regr_count(y, x),
              constant=regr_intercept(y, x),
              slope=regr_slope(y, x),
              mean_analyst_fog=regr_avgx(y, x),
              mean_manager_fog=regr_avgy(y, x)) %>%
    collect() %>%
    as.data.frame()

# Push to database.
dbWriteTable(pg$con, c("bgt", "within_call_data"), reg_results,
             append=TRUE, row.names=FALSE)

dplyr 不包括在数据库中插入或更新记录的命令,因此没有完整的本机 dplyr 解决方案。但是您可以将 dplyr 与常规 SQL 语句结合使用,以避免将数据带到 R.

让我们从在 collect() 语句

之前重现您的步骤开始
library(dplyr)

pg <- src_postgres()

reg_data <- tbl(pg, "reg_data")

reg_results <-
    reg_data %>%
    summarize(r_squared=regr_r2(y, x),
              num_obs=regr_count(y, x),
              constant=regr_intercept(y, x),
              slope=regr_slope(y, x),
              mean_analyst_fog=regr_avgx(y, x),
              mean_manager_fog=regr_avgy(y, x))

现在,您可以使用 compute() 而不是 collect() 在数据库中创建临时 table。

temp.table.name <- paste0(sample(letters, 10, replace = TRUE), collapse = "")

reg_results <- reg_results %>% compute(name=temp.table.name)

其中 temp.table.name 是随机的 table 名称。在计算中使用选项 name = temp.table.name,我们将这个随机名称分配给创建的临时 table。

现在,我们将使用库 RPostgreSQL 创建一个使用存储在临时 table 中的结果的插入查询。由于临时 table 仅存在于 src_postgresql() 创建的连接中,我们需要重用它。

library(RPostgreSQL)
copyconn <- pg$con
class(copyconn) <- "PostgreSQLConnection" # I get an error if I don't fix the class

最后是插入查询

sql <- paste0("INSERT INTO destination_table SELECT * FROM ", temp.tbl.name,";")

dbSendQuery(copyconn, sql)

所以,一切都发生在数据库中,数据没有进入 R。

编辑

当我们从 reg_results 获得 temp.tbl.name 时,此 post 的先前版本确实破坏了封装。使用选项 name=in compute.

可以避免这种情况

另一种选择是使用名为 sql_render() 的命令创建每个 SQL 语句,然后使用另一个名为 db_save_query() 的命令使用 table 创建 table =19=] 语句,然后是附加到 table 的手动语句。要遍历每个查询,使用 purrr 命令:mapwalk。优选地,像 compute() 命令这样的命令应该执行此操作,但取而代之的是,以下是一个完全可重现的示例:

library(dplyr)
library(dbplyr)
library(purrr)

# Setting up a SQLite db with 3 tables
con <- DBI::dbConnect(RSQLite::SQLite(), path = ":memory:")
copy_to(con, filter(mtcars, cyl == 4), "mtcars1")
copy_to(con, filter(mtcars, cyl == 6), "mtcars2")
copy_to(con, filter(mtcars, cyl == 8), "mtcars3")



# Pre-process the SQL statements
tables <- c("mtcars1","mtcars2","mtcars3")
all_results <- tables %>%
  map(~{
    tbl(con, .x) %>%
      summarise(avg_mpg = mean(mpg),
                records = n()) %>%
      sql_render() 
  })

# Execute the SQL statements, 1st one creates the table
# subsquent queries are insterted to the table
first_query <- TRUE
all_results %>%
  walk(~{
    if(first_query == TRUE){
      first_query <<- FALSE
      db_save_query(con, ., "results")
    } else {
      dbExecute(con, build_sql("INSERT INTO results ", .))
    }
  })


tbl(con, "results")

dbDisconnect(con)