如何有效地将许多变量粘贴到 sql 查询中(R Shiny)
How to efficiently paste many variables into a sql query (Rshiny)
我正在构建一个闪亮的应用程序,用户可以通过编辑 DT:table 中的选定行来更新数据库中的 table。
问题是当 dt:table 有很多列(例如 25)时,这个过程可能会很耗时。所以我想知道是否有一种很好且有效的方法来 link 我在下面的查询中使用数据框列的“vals”变量?
下面的代码有效,但由于我的 DT:table 有超过 60 列,我真的不能坚持使用这个解决方案...:(
selected_row <- donnees[input$dt_rows_selected,]
query <- glue_sql('UPDATE myschema.mytable SET field1= ({vals*}), field2= ({vals2*}), field3 = ({vals3*}), field4= ({vals4*}), field5= ({vals5*}) WHERE id IN ({ID_field*});',
vals = selected_row$column1, vals2 = selected_row$column2, vals3= selected_row$column3, vals4= selected_row$column4, vals5= selected_row$column5, ID_field= selected_row$ID, .con = pool)
DBI::dbExecute(pool2, query)
这个答案的目的有两个:
- 演示(一个?)正确的 postgres 风格的 upsert 操作。我提供了一个
pg_upsert
函数,并在该函数中包含了(前缀为 #'#
)查询完成后的样子。查询是动态形成的,因此除了用户提供的 idfields=
参数外,不需要其他字段的先验知识。
- 演示如何使用此函数对
DT
编辑作出反应。这是一种方式,肯定还有其他方式来制定如何处理被动 DT
。如果您有不同的风格来跟踪 DT
中的变化,那么请随意使用 pg_upsert
和 运行!
备注:
它不会在每个单元格编辑时更新数据库,更改是“批处理”的,直到用户单击 Upsert!
按钮;更改为“在每个单元格上插入”是可行的,但这将是一个相对微不足道的查询,不需要更新插入
由于您使用的是 postgres,目标 table 必须具有一个或多个唯一索引(参见 No unique or exclusion constraint matching the ON CONFLICT);我将在 table 上创建示例数据和索引;如果您不明白这意味着什么并且您的数据没有明确的“id”字段,那么请执行我所做的:添加一个 id
列(本地和数据库中)序列沿着你的真实行(如果你的数据已经存在并且没有 id 字段,这将不起作用)
id 字段 不能是 editable,因此 DT 的 editable=
部分禁用更改该列;我包含了一个查询(在 中找到),它将以编程方式告诉您这些字段;如果这 return 没什么,则返回上一个项目符号并修复它
pg_upsert
函数需要几个步骤来确保事情是干净的(即检查重复的 id
s),但不检查不正确的新值( DT
为您做了一些,我相信 class
),我假设您在发送更新插入之前验证了您需要的内容;
pg_upsert
中的 return 值是合乎逻辑的,表明 upsert 操作更新了我们预期的行数;这个 可能 过于激进,虽然我想不出一个例子,它会正确 return 而不是 nrow(value)
;买者自负
我在闪亮的布局中包含一个 可选 "dbout"
table 只是为了显示数据库数据的当前状态,已更新每次 pg_upsert
被调用(间接);如果未进行任何更改,它仍会查询以显示当前状态,因此是显示测试开始条件的最佳方式;同样,它是可选的。当你删除它(你应该)并且没有其他使用 do_update()
反应时,然后更改
do_update <- eventReactive(input$upbtn, ...)
output$dbout <- renderTable({ do_update(); ... })
至
observeEvent(input$upbtn, ...)
# output$dbout <- renderTable({ do_update(); ... })
(否则,从未在下游使用的 reactive(.)
块将永远不会触发,因此您的更新不会发生。)
这个 应用程序查询数据库中的所有值(进入curdata
),这可能已经在您的案例中完成了。此应用程序还可以(以编程方式)查找所需的索引。如果您提前知道这些是什么,请随意删除提供 idfields
的查询并直接分配它(区分大小写)。
当app退出时,用户编辑的数据不会保存在本地R中console/environment,所有的修改都保存在数据库中。我假设这将被形式化为 shiny-server
、RStudio Connect 或类似的生产服务器,在这种情况下,“控制台”意义不大。如果您在开发应用程序时确实需要用户更改的数据在本地 R 控制台上可用,那么除了使用 mydata
反应值外,在重新分配 mydata$data
之后,您还可以覆盖 curdata <<- mydata$data
(注意 <<-
中的双重 <
)。我不鼓励在生产中使用这种做法,但在开发中可能会有用。
这是示例数据的设置。不管你有 6 列(如此处)还是 60 列,前提仍然存在。 (在此之后,origdata
没有被使用,这是为准备这个答案而准备的。)
# pgcon <- DBI::dbConnect(...)
set.seed(42)
origdata <- iris[sample(nrow(iris), 6),]
origdata$id <- seq_len(nrow(origdata))
# setup for this answer
DBI::dbExecute(pgcon, "drop table if exists mydata")
DBI::dbWriteTable(pgcon, "mydata", origdata)
# postgres upserts require 'unique' index on 'id'
DBI::dbExecute(pgcon, "create unique index mydata_id_idx on mydata (id)")
这里是 UPSERT 函数本身,分解出来以方便测试、控制台评估和类似操作。
#' @param value 'data.frame', values to be updated, does not need to
#' include all columns in the database
#' @param name 'character', the table name to receive the updated
#' values
#' @param idfields 'character', one or more id fields that are present
#' in both the 'value' and the database table, these cannot change
#' @param con database connection object, from [DBI::dbConnect()]
#' @param verbose 'logical', be verbose about operation, default true
#' @return logical, whether 'nrow(value)' rows were affected; if an
#' error occurred, it is messaged to the console and a `FALSE` is
#' returned
pg_upsert <- function(value, name, idfields, con = NULL, verbose = TRUE) {
if (verbose) message(Sys.time(), " upsert ", name, " with ", nrow(value), " rows")
if (any(duplicated(value[idfields]))) {
message("'value' contains duplicates in the idfields, upsert will not work")
return(FALSE)
}
tmptable <- paste(c("uptemp_", name, "_", sample(1e6, size = 1)), collapse = "")
on.exit({
DBI::dbExecute(con, paste("drop table if exists", tmptable))
}, add = TRUE)
DBI::dbWriteTable(con, tmptable, value)
cn <- colnames(value)
quotednms <- DBI::dbQuoteIdentifier(con, cn)
notid <- DBI::dbQuoteIdentifier(con, setdiff(cn, idfields))
qry <- sprintf(
"INSERT INTO %s ( %s )
SELECT %s FROM %s
ON CONFLICT ( %s ) DO
UPDATE SET %s",
name, paste(quotednms, collapse = " , "),
paste(quotednms, collapse = " , "), tmptable,
paste(DBI::dbQuoteIdentifier(con, idfields), collapse = " , "),
paste(paste(notid, paste0("EXCLUDED.", notid), sep = "="), collapse = " , "))
#'# INSERT INTO mydata ( "Sepal.Length" , "Petal.Length" )
#'# SELECT "Sepal.Length" , "Petal.Length" , "id" FROM mydata
#'# ON CONFLICT ( "id" ) DO
#'# UPDATE SET "Sepal.Length"=EXCLUDED."Sepal.Length" , "Petal.Length"=EXCLUDED."Petal.Length"
# dbExecute returns the number of rows affected, this ensures we
# return a logical "yes, all rows were updated" or "no, something
# went wrong"
res <- tryCatch(DBI::dbExecute(con, qry), error = function(e) e)
if (inherits(res, "error")) {
msg <- paste("error upserting data:", conditionMessage(res))
message(Sys.time(), " ", msg)
ret <- FALSE
attr(ret, "error") <- conditionMessage(res)
} else {
ret <- (res == nrow(value))
if (!ret) {
msg <- paste("expecting", nrow(value), "rows updated, returned", res, "rows updated")
message(Sys.time(), " ", msg)
attr(ret, "error") <- msg
}
}
ret
}
这是闪亮的应用程序。当你获取这个时,你可以立即按 Upsert!
来获取数据库的当前状态 table (同样,只是一个选项,生产不需要),不需要更新值来重新查询。
library(shiny)
library(DT)
pgcon <- DBI::dbConnect(...) # fix this incomplete expression
curdata <- DBI::dbGetQuery(pgcon, "select * from mydata order by id")
# if you don't know the idfield(s) offhand, then use this:
idfields <- DBI::dbGetQuery(pgcon, "
select
t.relname as table_name,
i.relname as index_name,
a.attname as column_name
from
pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
where
t.oid = ix.indrelid
and i.oid = ix.indexrelid
and a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
and t.relkind = 'r'
and t.relname = 'mydata'
order by
t.relname,
i.relname;")
idfieldnums <- which(colnames(curdata) %in% idfields$column_name)
shinyApp(
ui = fluidPage(
DTOutput("tbl"),
actionButton("upbtn", "UPSERT!"),
tableOutput("dbout")
),
server = function(input, output) {
mydata <- reactiveValues(data = curdata, changes = NULL)
output$tbl = renderDT(
mydata$data, options = list(lengthChange = FALSE),
editable = list(target = "cell", disable = list(columns = idfields)))
observeEvent(input$tbl_cell_edit, {
mydata$data <- editData(mydata$data, input$tbl_cell_edit)
mydata$changes <- rbind(
if (!is.null(mydata$changes)) mydata$changes,
input$tbl_cell_edit
)
# keep the most recent change to the same cell
dupes <- rev(duplicated(mydata$changes[rev(seq(nrow(mydata$changes))),c("row","col")]))
mydata$changes <- mydata$changes[!dupes,]
message(Sys.time(), " pending changes: ", nrow(mydata$changes))
})
do_update <- eventReactive(input$upbtn, {
if (isTRUE(nrow(mydata$changes) > 0)) {
# always include the 'id' field(s)
# idcol <- which(colnames(mydata$data) == "id")
updateddata <- mydata$data[ mydata$changes$row, c(mydata$changes$col, idfieldnums) ]
res <- pg_upsert(updateddata, "mydata", idfields = "id", con = pgcon)
# clear the stored changes only if the upsert was successful
if (res) mydata$changes <- mydata$changes[0,]
}
input$upbtn
})
output$dbout <- renderTable({
do_update() # react when changes are attempted, the button is pressed
message(Sys.time(), " query 'mydata'")
DBI::dbGetQuery(pgcon, "select * from mydata order by id")
})
}
)
进行中:
- (左)当我们开始时,我们看到原始的
DT
并且没有数据库输出。
- (中)按
Upsert!
按钮只是为了查询数据库并显示可选的table。
- (右)进行更新,然后按
Upsert!
,数据库更新(下层table重新查询)。
我正在构建一个闪亮的应用程序,用户可以通过编辑 DT:table 中的选定行来更新数据库中的 table。
问题是当 dt:table 有很多列(例如 25)时,这个过程可能会很耗时。所以我想知道是否有一种很好且有效的方法来 link 我在下面的查询中使用数据框列的“vals”变量?
下面的代码有效,但由于我的 DT:table 有超过 60 列,我真的不能坚持使用这个解决方案...:(
selected_row <- donnees[input$dt_rows_selected,]
query <- glue_sql('UPDATE myschema.mytable SET field1= ({vals*}), field2= ({vals2*}), field3 = ({vals3*}), field4= ({vals4*}), field5= ({vals5*}) WHERE id IN ({ID_field*});',
vals = selected_row$column1, vals2 = selected_row$column2, vals3= selected_row$column3, vals4= selected_row$column4, vals5= selected_row$column5, ID_field= selected_row$ID, .con = pool)
DBI::dbExecute(pool2, query)
这个答案的目的有两个:
- 演示(一个?)正确的 postgres 风格的 upsert 操作。我提供了一个
pg_upsert
函数,并在该函数中包含了(前缀为#'#
)查询完成后的样子。查询是动态形成的,因此除了用户提供的idfields=
参数外,不需要其他字段的先验知识。 - 演示如何使用此函数对
DT
编辑作出反应。这是一种方式,肯定还有其他方式来制定如何处理被动DT
。如果您有不同的风格来跟踪DT
中的变化,那么请随意使用pg_upsert
和 运行!
备注:
它不会在每个单元格编辑时更新数据库,更改是“批处理”的,直到用户单击
Upsert!
按钮;更改为“在每个单元格上插入”是可行的,但这将是一个相对微不足道的查询,不需要更新插入由于您使用的是 postgres,目标 table 必须具有一个或多个唯一索引(参见 No unique or exclusion constraint matching the ON CONFLICT);我将在 table 上创建示例数据和索引;如果您不明白这意味着什么并且您的数据没有明确的“id”字段,那么请执行我所做的:添加一个
id
列(本地和数据库中)序列沿着你的真实行(如果你的数据已经存在并且没有 id 字段,这将不起作用)id 字段 不能是 editable,因此 DT 的
editable=
部分禁用更改该列;我包含了一个查询(在 中找到),它将以编程方式告诉您这些字段;如果这 return 没什么,则返回上一个项目符号并修复它pg_upsert
函数需要几个步骤来确保事情是干净的(即检查重复的id
s),但不检查不正确的新值(DT
为您做了一些,我相信class
),我假设您在发送更新插入之前验证了您需要的内容;pg_upsert
中的 return 值是合乎逻辑的,表明 upsert 操作更新了我们预期的行数;这个 可能 过于激进,虽然我想不出一个例子,它会正确 return 而不是nrow(value)
;买者自负我在闪亮的布局中包含一个 可选
"dbout"
table 只是为了显示数据库数据的当前状态,已更新每次pg_upsert
被调用(间接);如果未进行任何更改,它仍会查询以显示当前状态,因此是显示测试开始条件的最佳方式;同样,它是可选的。当你删除它(你应该)并且没有其他使用do_update()
反应时,然后更改do_update <- eventReactive(input$upbtn, ...) output$dbout <- renderTable({ do_update(); ... })
至
observeEvent(input$upbtn, ...) # output$dbout <- renderTable({ do_update(); ... })
(否则,从未在下游使用的
reactive(.)
块将永远不会触发,因此您的更新不会发生。)这个 应用程序查询数据库中的所有值(进入
curdata
),这可能已经在您的案例中完成了。此应用程序还可以(以编程方式)查找所需的索引。如果您提前知道这些是什么,请随意删除提供idfields
的查询并直接分配它(区分大小写)。当app退出时,用户编辑的数据不会保存在本地R中console/environment,所有的修改都保存在数据库中。我假设这将被形式化为
shiny-server
、RStudio Connect 或类似的生产服务器,在这种情况下,“控制台”意义不大。如果您在开发应用程序时确实需要用户更改的数据在本地 R 控制台上可用,那么除了使用mydata
反应值外,在重新分配mydata$data
之后,您还可以覆盖curdata <<- mydata$data
(注意<<-
中的双重<
)。我不鼓励在生产中使用这种做法,但在开发中可能会有用。
这是示例数据的设置。不管你有 6 列(如此处)还是 60 列,前提仍然存在。 (在此之后,origdata
没有被使用,这是为准备这个答案而准备的。)
# pgcon <- DBI::dbConnect(...)
set.seed(42)
origdata <- iris[sample(nrow(iris), 6),]
origdata$id <- seq_len(nrow(origdata))
# setup for this answer
DBI::dbExecute(pgcon, "drop table if exists mydata")
DBI::dbWriteTable(pgcon, "mydata", origdata)
# postgres upserts require 'unique' index on 'id'
DBI::dbExecute(pgcon, "create unique index mydata_id_idx on mydata (id)")
这里是 UPSERT 函数本身,分解出来以方便测试、控制台评估和类似操作。
#' @param value 'data.frame', values to be updated, does not need to
#' include all columns in the database
#' @param name 'character', the table name to receive the updated
#' values
#' @param idfields 'character', one or more id fields that are present
#' in both the 'value' and the database table, these cannot change
#' @param con database connection object, from [DBI::dbConnect()]
#' @param verbose 'logical', be verbose about operation, default true
#' @return logical, whether 'nrow(value)' rows were affected; if an
#' error occurred, it is messaged to the console and a `FALSE` is
#' returned
pg_upsert <- function(value, name, idfields, con = NULL, verbose = TRUE) {
if (verbose) message(Sys.time(), " upsert ", name, " with ", nrow(value), " rows")
if (any(duplicated(value[idfields]))) {
message("'value' contains duplicates in the idfields, upsert will not work")
return(FALSE)
}
tmptable <- paste(c("uptemp_", name, "_", sample(1e6, size = 1)), collapse = "")
on.exit({
DBI::dbExecute(con, paste("drop table if exists", tmptable))
}, add = TRUE)
DBI::dbWriteTable(con, tmptable, value)
cn <- colnames(value)
quotednms <- DBI::dbQuoteIdentifier(con, cn)
notid <- DBI::dbQuoteIdentifier(con, setdiff(cn, idfields))
qry <- sprintf(
"INSERT INTO %s ( %s )
SELECT %s FROM %s
ON CONFLICT ( %s ) DO
UPDATE SET %s",
name, paste(quotednms, collapse = " , "),
paste(quotednms, collapse = " , "), tmptable,
paste(DBI::dbQuoteIdentifier(con, idfields), collapse = " , "),
paste(paste(notid, paste0("EXCLUDED.", notid), sep = "="), collapse = " , "))
#'# INSERT INTO mydata ( "Sepal.Length" , "Petal.Length" )
#'# SELECT "Sepal.Length" , "Petal.Length" , "id" FROM mydata
#'# ON CONFLICT ( "id" ) DO
#'# UPDATE SET "Sepal.Length"=EXCLUDED."Sepal.Length" , "Petal.Length"=EXCLUDED."Petal.Length"
# dbExecute returns the number of rows affected, this ensures we
# return a logical "yes, all rows were updated" or "no, something
# went wrong"
res <- tryCatch(DBI::dbExecute(con, qry), error = function(e) e)
if (inherits(res, "error")) {
msg <- paste("error upserting data:", conditionMessage(res))
message(Sys.time(), " ", msg)
ret <- FALSE
attr(ret, "error") <- conditionMessage(res)
} else {
ret <- (res == nrow(value))
if (!ret) {
msg <- paste("expecting", nrow(value), "rows updated, returned", res, "rows updated")
message(Sys.time(), " ", msg)
attr(ret, "error") <- msg
}
}
ret
}
这是闪亮的应用程序。当你获取这个时,你可以立即按 Upsert!
来获取数据库的当前状态 table (同样,只是一个选项,生产不需要),不需要更新值来重新查询。
library(shiny)
library(DT)
pgcon <- DBI::dbConnect(...) # fix this incomplete expression
curdata <- DBI::dbGetQuery(pgcon, "select * from mydata order by id")
# if you don't know the idfield(s) offhand, then use this:
idfields <- DBI::dbGetQuery(pgcon, "
select
t.relname as table_name,
i.relname as index_name,
a.attname as column_name
from
pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
where
t.oid = ix.indrelid
and i.oid = ix.indexrelid
and a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
and t.relkind = 'r'
and t.relname = 'mydata'
order by
t.relname,
i.relname;")
idfieldnums <- which(colnames(curdata) %in% idfields$column_name)
shinyApp(
ui = fluidPage(
DTOutput("tbl"),
actionButton("upbtn", "UPSERT!"),
tableOutput("dbout")
),
server = function(input, output) {
mydata <- reactiveValues(data = curdata, changes = NULL)
output$tbl = renderDT(
mydata$data, options = list(lengthChange = FALSE),
editable = list(target = "cell", disable = list(columns = idfields)))
observeEvent(input$tbl_cell_edit, {
mydata$data <- editData(mydata$data, input$tbl_cell_edit)
mydata$changes <- rbind(
if (!is.null(mydata$changes)) mydata$changes,
input$tbl_cell_edit
)
# keep the most recent change to the same cell
dupes <- rev(duplicated(mydata$changes[rev(seq(nrow(mydata$changes))),c("row","col")]))
mydata$changes <- mydata$changes[!dupes,]
message(Sys.time(), " pending changes: ", nrow(mydata$changes))
})
do_update <- eventReactive(input$upbtn, {
if (isTRUE(nrow(mydata$changes) > 0)) {
# always include the 'id' field(s)
# idcol <- which(colnames(mydata$data) == "id")
updateddata <- mydata$data[ mydata$changes$row, c(mydata$changes$col, idfieldnums) ]
res <- pg_upsert(updateddata, "mydata", idfields = "id", con = pgcon)
# clear the stored changes only if the upsert was successful
if (res) mydata$changes <- mydata$changes[0,]
}
input$upbtn
})
output$dbout <- renderTable({
do_update() # react when changes are attempted, the button is pressed
message(Sys.time(), " query 'mydata'")
DBI::dbGetQuery(pgcon, "select * from mydata order by id")
})
}
)
进行中:
- (左)当我们开始时,我们看到原始的
DT
并且没有数据库输出。 - (中)按
Upsert!
按钮只是为了查询数据库并显示可选的table。 - (右)进行更新,然后按
Upsert!
,数据库更新(下层table重新查询)。