R 和 dplyr:如何在与源模式不同的模式中使用 compute() 从 SQL 查询创建持久性 table?

R and dplyr: How can I use compute() to create a persistent table from SQL query in a different schema than the source schema?

我有一个类似于 的问题。

如何从数据库中的 SQL 查询创建持久性 table(我使用 DB2 数据库)?我的目标是使用一个模式中的 table 并在另一个模式中永久创建或多或少修改过的 table。

目前有效的是将数据提取到 R,然后在不同的架构中创建 table:

dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
collect() %>% 
DBI::dbWriteTable(con, Id(schema = "SCHEMA_B", table = "NEW_TABLE"), ., overwrite = TRUE)

但是,我想将 compute() 函数合并到 dplyr 管道中,这样我就不必将数据拉入 R,也就是说,我想保留数据库中的数据。作为旁注:我不知道如何将 DBIdbWriteTable() 替换为 dplyrcopy_to() – 能够做到这一点也会对我有所帮助。

不幸的是,即使在阅读了 ?compute() 及其 online documentation 之后,我还是无法让它工作。以下代码框架不起作用并导致错误:

dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
dplyr::compute(in_schema("SCHEMA_B", "NEW_TABLE"), analyze = FALSE, temporary = FALSE)

是否有使用 compute() 的解决方案或适用于 dplyr 管道的其他解决方案?

我使用了一个自定义函数,它接受远程 table 后面的 SQL 查询,将其转换为可以在 SQL 服务器上执行的查询以保存新的 table,然后使用 DBI 包执行该查询。下面的关键详细信息,我的 GitHub 存储库 here.

中的完整详细信息(以及我认为有用的其他功能)
write_to_database <- function(input_tbl, db_connection, db, schema, tbl_name){
  # SQL query
  sql_query <- glue::glue("SELECT *\n",
                          "INTO {db}.{schema}.{tbl_name}\n",
                          "FROM (\n",
                          dbplyr::sql_render(input_tbl),
                          "\n) AS from_table")
  
  # run query
  DBI::dbExecute(db_connection, as.character(sql_query))
}

这个想法的本质是构建一个 SQL 查询,如果您直接用您的数据库语言执行它,将会得到您想要的结果。在我的应用程序中,它采用以下形式:

SELECT *
INTO db.schema.table
FROM (
  /* sub query for existing table */
) AS alias

请注意,这是使用 SQL 服务器,您的特定 SQL 语法可能有所不同。 INTO 是用于编写 table 的 SQL 服务器模式。在问题中链接的示例中,语法为 TO TABLE.

感谢@Simon.S.A.,我可以解决我的问题。正如他在回复中所展示的那样,可以定义自定义函数并将其合并到 dplyr 管道中。我改编的代码如下所示:

# Custom function

write_to_database <- function(input_tbl, db_connection, schema, tbl_name){
  
  # SQL query

  sql_query <- glue::glue("CREATE TABLE {schema}.{tbl_name} AS (\n",
                      "SELECT * FROM (\n",
                      dbplyr::sql_render(input_tbl),
                      "\n)) WITH DATA;")

  # Drop table if it exists
  
  DBI::dbExecute(con, glue::glue("BEGIN\n",
                                    "IF EXISTS\n",
                                      "(SELECT TABNAME FROM SYSCAT.TABLES WHERE TABSCHEMA = '{schema}' AND TABNAME = '{tbl_name}') THEN\n",
                                        "PREPARE stmt FROM 'DROP TABLE {schema}.{tbl_name}';\n",
                                        "EXECUTE stmt;\n", 
                                    "END IF;\n",
                                 "END"))
  
  # Run query

  DBI::dbExecute(db_connection, as.character(sql_query))
}

# Dplyr pipeline

dplyr::tbl(con, in_schema("SCHEMA_A", "SOURCE_TABLE_NAME")) %>%
  dplyr::filter(VARIABLE == "ABC") %>% 
  show_query() %>% 
  write_to_database(., con, "SCHEMA_B", "NEW_TABLE_NAME")

事实证明 DB2 似乎不知道 DROP TABLE IF EXISTS 因此需要一些额外的编程。我用这个 Whosebug post 来完成它。此外,在我的例子中,我不需要明确指定数据库,这样自定义函数中的参数 db 就被省略了。