R 和 dplyr:如何在与源模式不同的模式中使用 compute() 从 SQL 查询创建持久性 table?
R and dplyr: How can I use compute() to create a persistent table from SQL query in a different schema than the source schema?
我有一个类似于 的问题。
如何从数据库中的 SQL 查询创建持久性 table(我使用 DB2 数据库)?我的目标是使用一个模式中的 table 并在另一个模式中永久创建或多或少修改过的 table。
目前有效的是将数据提取到 R
,然后在不同的架构中创建 table:
dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
collect() %>%
DBI::dbWriteTable(con, Id(schema = "SCHEMA_B", table = "NEW_TABLE"), ., overwrite = TRUE)
但是,我想将 compute()
函数合并到 dplyr
管道中,这样我就不必将数据拉入 R,也就是说,我想保留数据库中的数据。作为旁注:我不知道如何将 DBI
的 dbWriteTable()
替换为 dplyr
的 copy_to()
– 能够做到这一点也会对我有所帮助。
不幸的是,即使在阅读了 ?compute()
及其 online documentation 之后,我还是无法让它工作。以下代码框架不起作用并导致错误:
dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
dplyr::compute(in_schema("SCHEMA_B", "NEW_TABLE"), analyze = FALSE, temporary = FALSE)
是否有使用 compute()
的解决方案或适用于 dplyr 管道的其他解决方案?
我使用了一个自定义函数,它接受远程 table 后面的 SQL 查询,将其转换为可以在 SQL 服务器上执行的查询以保存新的 table,然后使用 DBI 包执行该查询。下面的关键详细信息,我的 GitHub 存储库 here.
中的完整详细信息(以及我认为有用的其他功能)
write_to_database <- function(input_tbl, db_connection, db, schema, tbl_name){
# SQL query
sql_query <- glue::glue("SELECT *\n",
"INTO {db}.{schema}.{tbl_name}\n",
"FROM (\n",
dbplyr::sql_render(input_tbl),
"\n) AS from_table")
# run query
DBI::dbExecute(db_connection, as.character(sql_query))
}
这个想法的本质是构建一个 SQL 查询,如果您直接用您的数据库语言执行它,将会得到您想要的结果。在我的应用程序中,它采用以下形式:
SELECT *
INTO db.schema.table
FROM (
/* sub query for existing table */
) AS alias
请注意,这是使用 SQL 服务器,您的特定 SQL 语法可能有所不同。 INTO
是用于编写 table 的 SQL 服务器模式。在问题中链接的示例中,语法为 TO TABLE
.
感谢@Simon.S.A.,我可以解决我的问题。正如他在回复中所展示的那样,可以定义自定义函数并将其合并到 dplyr
管道中。我改编的代码如下所示:
# Custom function
write_to_database <- function(input_tbl, db_connection, schema, tbl_name){
# SQL query
sql_query <- glue::glue("CREATE TABLE {schema}.{tbl_name} AS (\n",
"SELECT * FROM (\n",
dbplyr::sql_render(input_tbl),
"\n)) WITH DATA;")
# Drop table if it exists
DBI::dbExecute(con, glue::glue("BEGIN\n",
"IF EXISTS\n",
"(SELECT TABNAME FROM SYSCAT.TABLES WHERE TABSCHEMA = '{schema}' AND TABNAME = '{tbl_name}') THEN\n",
"PREPARE stmt FROM 'DROP TABLE {schema}.{tbl_name}';\n",
"EXECUTE stmt;\n",
"END IF;\n",
"END"))
# Run query
DBI::dbExecute(db_connection, as.character(sql_query))
}
# Dplyr pipeline
dplyr::tbl(con, in_schema("SCHEMA_A", "SOURCE_TABLE_NAME")) %>%
dplyr::filter(VARIABLE == "ABC") %>%
show_query() %>%
write_to_database(., con, "SCHEMA_B", "NEW_TABLE_NAME")
事实证明 DB2 似乎不知道 DROP TABLE IF EXISTS
因此需要一些额外的编程。我用这个 Whosebug post 来完成它。此外,在我的例子中,我不需要明确指定数据库,这样自定义函数中的参数 db
就被省略了。
我有一个类似于
如何从数据库中的 SQL 查询创建持久性 table(我使用 DB2 数据库)?我的目标是使用一个模式中的 table 并在另一个模式中永久创建或多或少修改过的 table。
目前有效的是将数据提取到 R
,然后在不同的架构中创建 table:
dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
collect() %>%
DBI::dbWriteTable(con, Id(schema = "SCHEMA_B", table = "NEW_TABLE"), ., overwrite = TRUE)
但是,我想将 compute()
函数合并到 dplyr
管道中,这样我就不必将数据拉入 R,也就是说,我想保留数据库中的数据。作为旁注:我不知道如何将 DBI
的 dbWriteTable()
替换为 dplyr
的 copy_to()
– 能够做到这一点也会对我有所帮助。
不幸的是,即使在阅读了 ?compute()
及其 online documentation 之后,我还是无法让它工作。以下代码框架不起作用并导致错误:
dplyr::tbl(con, in_schema("SCHEMA_A", "TABLE")) %>%
dplyr::compute(in_schema("SCHEMA_B", "NEW_TABLE"), analyze = FALSE, temporary = FALSE)
是否有使用 compute()
的解决方案或适用于 dplyr 管道的其他解决方案?
我使用了一个自定义函数,它接受远程 table 后面的 SQL 查询,将其转换为可以在 SQL 服务器上执行的查询以保存新的 table,然后使用 DBI 包执行该查询。下面的关键详细信息,我的 GitHub 存储库 here.
中的完整详细信息(以及我认为有用的其他功能)write_to_database <- function(input_tbl, db_connection, db, schema, tbl_name){
# SQL query
sql_query <- glue::glue("SELECT *\n",
"INTO {db}.{schema}.{tbl_name}\n",
"FROM (\n",
dbplyr::sql_render(input_tbl),
"\n) AS from_table")
# run query
DBI::dbExecute(db_connection, as.character(sql_query))
}
这个想法的本质是构建一个 SQL 查询,如果您直接用您的数据库语言执行它,将会得到您想要的结果。在我的应用程序中,它采用以下形式:
SELECT *
INTO db.schema.table
FROM (
/* sub query for existing table */
) AS alias
请注意,这是使用 SQL 服务器,您的特定 SQL 语法可能有所不同。 INTO
是用于编写 table 的 SQL 服务器模式。在问题中链接的示例中,语法为 TO TABLE
.
感谢@Simon.S.A.,我可以解决我的问题。正如他在回复中所展示的那样,可以定义自定义函数并将其合并到 dplyr
管道中。我改编的代码如下所示:
# Custom function
write_to_database <- function(input_tbl, db_connection, schema, tbl_name){
# SQL query
sql_query <- glue::glue("CREATE TABLE {schema}.{tbl_name} AS (\n",
"SELECT * FROM (\n",
dbplyr::sql_render(input_tbl),
"\n)) WITH DATA;")
# Drop table if it exists
DBI::dbExecute(con, glue::glue("BEGIN\n",
"IF EXISTS\n",
"(SELECT TABNAME FROM SYSCAT.TABLES WHERE TABSCHEMA = '{schema}' AND TABNAME = '{tbl_name}') THEN\n",
"PREPARE stmt FROM 'DROP TABLE {schema}.{tbl_name}';\n",
"EXECUTE stmt;\n",
"END IF;\n",
"END"))
# Run query
DBI::dbExecute(db_connection, as.character(sql_query))
}
# Dplyr pipeline
dplyr::tbl(con, in_schema("SCHEMA_A", "SOURCE_TABLE_NAME")) %>%
dplyr::filter(VARIABLE == "ABC") %>%
show_query() %>%
write_to_database(., con, "SCHEMA_B", "NEW_TABLE_NAME")
事实证明 DB2 似乎不知道 DROP TABLE IF EXISTS
因此需要一些额外的编程。我用这个 Whosebug post 来完成它。此外,在我的例子中,我不需要明确指定数据库,这样自定义函数中的参数 db
就被省略了。