如何有效地将许多变量粘贴到 sql 查询中(R Shiny)

How to efficiently paste many variables into a sql query (Rshiny)

我正在构建一个闪亮的应用程序,用户可以通过编辑 DT:table 中的选定行来更新数据库中的 table。

问题是当 dt:table 有很多列(例如 25)时,这个过程可能会很耗时。所以我想知道是否有一种很好且有效的方法来 link 我在下面的查询中使用数据框列的“vals”变量?

下面的代码有效,但由于我的 DT:table 有超过 60 列,我真的不能坚持使用这个解决方案...:(

selected_row <- donnees[input$dt_rows_selected,]

query <- glue_sql('UPDATE myschema.mytable SET field1= ({vals*}), field2= ({vals2*}), field3 = ({vals3*}), field4= ({vals4*}), field5= ({vals5*}) WHERE id IN ({ID_field*});',
                          vals = selected_row$column1, vals2 = selected_row$column2, vals3= selected_row$column3, vals4= selected_row$column4, vals5= selected_row$column5, ID_field= selected_row$ID, .con = pool)
    
DBI::dbExecute(pool2, query)

这个答案的目的有两个:

  • 演示(一个?)正确的 postgres 风格的 upsert 操作。我提供了一个 pg_upsert 函数,并在该函数中包含了(前缀为 #'#)查询完成后的样子。查询是动态形成的,因此除了用户提供的 idfields= 参数外,不需要其他字段的先验知识。
  • 演示如何使用此函数对 DT 编辑作出反应。这是一种方式,肯定还有其他方式来制定如何处理被动 DT。如果您有不同的风格来跟踪 DT 中的变化,那么请随意使用 pg_upsert 和 运行!

备注:

  • 它不会在每个单元格编辑时更新数据库,更改是“批处理”的,直到用户单击 Upsert! 按钮;更改为“在每个单元格上插入”是可行的,但这将是一个相对微不足道的查询,不需要更新插入

  • 由于您使用的是 postgres,目标 table 必须具有一个或多个唯一索引(参见 No unique or exclusion constraint matching the ON CONFLICT);我将在 table 上创建示例数据和索引;如果您不明白这意味着什么并且您的数据没有明确的“id”字段,那么请执行我所做的:添加一个 id 列(本地和数据库中)序列沿着你的真实行(如果你的数据已经存在并且没有 id 字段,这将不起作用)

  • id 字段 不能是 editable,因此 DT 的 editable= 部分禁用更改该列;我包含了一个查询(在 中找到),它将以编程方式告诉您这些字段;如果这 return 没什么,则返回上一个项目符号并修复它

  • pg_upsert 函数需要几个步骤来确保事情是干净的(即检查重复的 ids),但不检查不正确的新值( DT 为您做了一些,我相信 class),我假设您在发送更新插入之前验证了您需要的内容;

  • pg_upsert 中的 return 值是合乎逻辑的,表明 upsert 操作更新了我们预期的行数;这个 可能 过于激进,虽然我想不出一个例子,它会正确 return 而不是 nrow(value);买者自负

  • 我在闪亮的布局中包含一个 可选 "dbout" table 只是为了显示数据库数据的当前状态,已更新每次 pg_upsert 被调用(间接);如果未进行任何更改,它仍会查询以显示当前状态,因此是显示测试开始条件的最佳方式;同样,它是可选的。当你删除它(你应该)并且没有其他使用 do_update() 反应时,然后更改

    do_update <- eventReactive(input$upbtn, ...)
    output$dbout <- renderTable({ do_update(); ... })
    

    observeEvent(input$upbtn, ...)
    # output$dbout <- renderTable({ do_update(); ... })
    

    (否则,从未在下游使用的 reactive(.) 块将永远不会触发,因此您的更新不会发生。)

  • 这个 应用程序查询数据库中的所有值(进入curdata),这可能已经在您的案例中完成了。此应用程序还可以(以编程方式)查找所需的索引。如果您提前知道这些是什么,请随意删除提供 idfields 的查询并直接分配它(区分大小写)。

  • 当app退出时,用户编辑的数据不会保存在本地R中console/environment,所有的修改都保存在数据库中。我假设这将被形式化为 shiny-server、RStudio Connect 或类似的生产服务器,在这种情况下,“控制台”意义不大。如果您在开发应用程序时确实需要用户更改的数据在本地 R 控制台上可用,那么除了使用 mydata 反应值外,在重新分配 mydata$data 之后,您还可以覆盖 curdata <<- mydata$data(注意 <<- 中的双重 <)。我不鼓励在生产中使用这种做法,但在开发中可能会有用。

这是示例数据的设置。不管你有 6 列(如此处)还是 60 列,前提仍然存在。 (在此之后,origdata 没有被使用,这是为准备这个答案而准备的。)

# pgcon <- DBI::dbConnect(...)
set.seed(42)
origdata <- iris[sample(nrow(iris), 6),]
origdata$id <- seq_len(nrow(origdata))
# setup for this answer
DBI::dbExecute(pgcon, "drop table if exists mydata")
DBI::dbWriteTable(pgcon, "mydata", origdata)
# postgres upserts require 'unique' index on 'id'
DBI::dbExecute(pgcon, "create unique index mydata_id_idx on mydata (id)")

这里是 UPSERT 函数本身,分解出来以方便测试、控制台评估和类似操作。

#' @param value 'data.frame', values to be updated, does not need to
#'   include all columns in the database
#' @param name 'character', the table name to receive the updated
#'   values
#' @param idfields 'character', one or more id fields that are present
#'   in both the 'value' and the database table, these cannot change
#' @param con database connection object, from [DBI::dbConnect()]
#' @param verbose 'logical', be verbose about operation, default true
#' @return logical, whether 'nrow(value)' rows were affected; if an
#'   error occurred, it is messaged to the console and a `FALSE` is
#'   returned
pg_upsert <- function(value, name, idfields, con = NULL, verbose = TRUE) {
  if (verbose) message(Sys.time(), " upsert ", name, " with ", nrow(value), " rows")
  if (any(duplicated(value[idfields]))) {
    message("'value' contains duplicates in the idfields, upsert will not work")
    return(FALSE)
  }
  tmptable <- paste(c("uptemp_", name, "_", sample(1e6, size = 1)), collapse = "")
  on.exit({
    DBI::dbExecute(con, paste("drop table if exists", tmptable))
  }, add = TRUE)
  DBI::dbWriteTable(con, tmptable, value)
  cn <- colnames(value)
  quotednms <- DBI::dbQuoteIdentifier(con, cn)
  notid <- DBI::dbQuoteIdentifier(con, setdiff(cn, idfields))
  qry <- sprintf(
    "INSERT INTO %s ( %s )
     SELECT %s FROM %s
     ON CONFLICT ( %s ) DO
     UPDATE SET %s",
    name, paste(quotednms, collapse = " , "),
    paste(quotednms, collapse = " , "), tmptable,
    paste(DBI::dbQuoteIdentifier(con, idfields), collapse = " , "),
    paste(paste(notid, paste0("EXCLUDED.", notid), sep = "="), collapse = " , "))
  #'# INSERT INTO mydata ( "Sepal.Length" , "Petal.Length" )
  #'#      SELECT "Sepal.Length" , "Petal.Length" , "id" FROM mydata
  #'#      ON CONFLICT ( "id" ) DO
  #'#      UPDATE SET "Sepal.Length"=EXCLUDED."Sepal.Length" , "Petal.Length"=EXCLUDED."Petal.Length"
  # dbExecute returns the number of rows affected, this ensures we
  # return a logical "yes, all rows were updated" or "no, something
  # went wrong"
  res <- tryCatch(DBI::dbExecute(con, qry), error = function(e) e)
  if (inherits(res, "error")) {
    msg <- paste("error upserting data:", conditionMessage(res))
    message(Sys.time(), " ", msg)
    ret <- FALSE
    attr(ret, "error") <- conditionMessage(res)
  } else {
    ret <- (res == nrow(value))
    if (!ret) {
      msg <- paste("expecting", nrow(value), "rows updated, returned", res, "rows updated")
      message(Sys.time(), " ", msg)
      attr(ret, "error") <- msg
    }
  }
  ret
}

这是闪亮的应用程序。当你获取这个时,你可以立即按 Upsert! 来获取数据库的当前状态 table (同样,只是一个选项,生产不需要),不需要更新值来重新查询。

library(shiny)
library(DT)

pgcon <- DBI::dbConnect(...) # fix this incomplete expression

curdata <- DBI::dbGetQuery(pgcon, "select * from mydata order by id")
# if you don't know the idfield(s) offhand, then use this:
idfields <- DBI::dbGetQuery(pgcon, "
  select
      t.relname as table_name,
      i.relname as index_name,
      a.attname as column_name
  from
      pg_class t,
      pg_class i,
      pg_index ix,
      pg_attribute a
  where
      t.oid = ix.indrelid
      and i.oid = ix.indexrelid
      and a.attrelid = t.oid
      and a.attnum = ANY(ix.indkey)
      and t.relkind = 'r'
      and t.relname = 'mydata'
  order by
      t.relname,
      i.relname;")
idfieldnums <- which(colnames(curdata) %in% idfields$column_name)

shinyApp(
  ui = fluidPage(
    DTOutput("tbl"),
    actionButton("upbtn", "UPSERT!"),
    tableOutput("dbout")
  ),
  server = function(input, output) {

    mydata <- reactiveValues(data = curdata, changes = NULL)

    output$tbl = renderDT(
      mydata$data, options = list(lengthChange = FALSE),
      editable = list(target = "cell", disable = list(columns = idfields)))

    observeEvent(input$tbl_cell_edit, {
      mydata$data <- editData(mydata$data, input$tbl_cell_edit)
      mydata$changes <- rbind(
        if (!is.null(mydata$changes)) mydata$changes,
        input$tbl_cell_edit
      )
      # keep the most recent change to the same cell
      dupes <- rev(duplicated(mydata$changes[rev(seq(nrow(mydata$changes))),c("row","col")]))
      mydata$changes <- mydata$changes[!dupes,]
      message(Sys.time(), " pending changes: ", nrow(mydata$changes))
    })

    do_update <- eventReactive(input$upbtn, {
      if (isTRUE(nrow(mydata$changes) > 0)) {
        # always include the 'id' field(s)
        # idcol <- which(colnames(mydata$data) == "id")
        updateddata <- mydata$data[ mydata$changes$row, c(mydata$changes$col, idfieldnums) ]
        res <- pg_upsert(updateddata, "mydata", idfields = "id", con = pgcon)
        # clear the stored changes only if the upsert was successful
        if (res) mydata$changes <- mydata$changes[0,]
      }
      input$upbtn
    })

    output$dbout <- renderTable({
      do_update() # react when changes are attempted, the button is pressed
      message(Sys.time(), " query 'mydata'")
      DBI::dbGetQuery(pgcon, "select * from mydata order by id")
    })

  }
)

进行中:

  • (左)当我们开始时,我们看到原始的DT并且没有数据库输出。
  • (中)按Upsert!按钮只是为了查询数据库并显示可选的table。
  • (右)进行更新,然后按Upsert!,数据库更新(下层table重新查询)。