以向量为输入查询R中不同时间序列的函数

Function to query different time series in R with vector as input

我正在获取数据库事实 table 中特定时间序列的最小最大日期,如下所示:

auxiliar.dates <- function(machine, signal) {
  q.Aux1 <- paste("SELECT
         t1.machine,
       t1.signal,
       t2.signal_name,
       t1.min_snsr_dt,
       t1.max_snsr_dt,
       t1.min_snsr_ts,
       t1.max_snsr_ts,
       t1.min_etl_dt,
       t1.max_etl_dt,
       t1.rec_cnt
       FROM ", config$SF_CONFIG$my_schema_name1, ".mytable1 AS t1 
       LEFT JOIN ", config$SF_CONFIG$my_schema_name1, ".mytable2", "AS t2
       ON t1.signal=t2.signal 
       WHERE t1.unit_key=")
  q.Aux2 <- " AND t1.signal="
  q.Aux.final <- str_c(q.Aux1, machine, q.Aux2, signal)
  res <- dbSendQuery(myConn, q.Aux.final)
  df <- as.data.table(dbFetch(res, n=-1))
  dbClearResult(res)
  return(df)
}

dates <-auxiliar.dates("machine", "signal")

这个函数的输出是一个数据table如下:

然后我使用输出查询最小和最大ts之间的特定信号如下:

signalQuery <- function(machine, signal, min_ts, max_ts) {

  q1.aux1 <- paste("SELECT snsr_val, 
                      snsr_ts, 
                      snsr_dt, 
                      signal,
                      qual, 
                      machine 
                      FROM ", config$SF_CONFIG$schema_name1, 
                     ".mytable1 AS v
                      WHERE machine=", sep="")

  q3.aux1 <-paste(" AND signal=", signal, " AND snsr_ts BETWEEN ", "'", min_ts, "'",
                    " AND ", "'", max_ts, "'", " ORDER BY v.snsr_ts", sep = "")

  qt.auxtotal <- str_c(q1.aux1,
                     machine,
                     q3.aux1) #we join que full query with stringr library

  res <- dbSendQuery(myConn, qt.auxtotal)
  df <- as.data.table(dbFetch(res,n=-1))
  dbClearResult(res) #cleaning memory
  return(df)
}

调用信号 71,例如我正在做的:

    signal71.dates <- auxiliar.dates(machine, 71)
    df   <- signalQuery(machine, 71, signal71.dates$min_snsr_dt, signal71.dates$max_snsr_dt)

如果我需要查询更多信号,我会执行完全相同的过程,但我会使用我的数据帧调用 signal_number.dates 的 max_snsr_dt 的最小值和 signal_number.dates 的最大值 min_snsr_dt 我的数据帧 signal_number.dates.

我想知道稍微改变一下流程,并能够在 auxiliar.dates 和 signalQuery 函数中输入我想要的信号向量。

我第一次尝试修改auxiliar.dates:

q.Aux2 <- " AND t1.signal="

至:

q.Aux2 <- " AND t1.signal IN ("
q.Aux.final <- str_c(q.Aux1, machine, q.Aux2, paste(signal, ")", sep = ""))

但是当我调用函数时:

test <- auxiliar.dates(984, c(70,71))

我收到以下错误:

Error in new_result(connection@ptr, statement) : Expecting a single string value: [type=character; extent=2].

会有人支持吗?

BR

考虑以下更改:

  • 参数化:避免过多的字符串拼接,影响可读性和可维护性。而是使用 DBI + odbcsqlInterpolate 支持的参数化。理想情况下,您会在 SQL 字符串语句中对 table 名称进行硬编码,但由于无法参数化标识符,因此 paste (或 paste0 之间没有空格)仍然必须被使用。

  • 单个 SQL 查询:使用 Snowflake 支持的 Common Table Expression (CTE) 组合两个 SQL 查询。具体来说,第一个查询通过 machinesignal 和日期 BETWEEN 间隔连接到最后一个查询。反过来,您将这两个功能结合起来,减少数据库访问次数,并避免中间的辅助对象。

  • 使用dbGetQuery:如果数据加载不是问题,需要按块获取大型结果集,请使用dbGetQuery结合 dbSendQuerydbFetch 步骤进行简洁。

  • 函数输入:正如@r2evans 评论的那样,避免依赖未知父源的环境变量位于本地函数中。相反,为局部范围变量传递所有需要的输入参数。

  • Iteration:因为这些函数使用标量参数,所以必须多次迭代值,例如 with lapply 到 运行 函数次,然后行绑定最终数据的结果 table.

单一功能

signalQuery <- function(my_schema, machine, signal) { 
    # PREPARED STATEMENT 
    sql <- paste0("WITH sub AS 
                     (SELECT t1.machine, t1.signal,  t2.signal_name, 
                             t1.min_snsr_dt, t1.max_snsr_dt,
                             t1.min_snsr_ts, t1.max_snsr_ts, 
                             t1.min_etl_dt,  t1.max_etl_dt, t1.rec_cnt
                      FROM ", my_schema, ".mytable1 AS t1 
                      LEFT JOIN ", my_schema, ".mytable2", "AS t2
                          ON t1.signal = t2.signal 
                      WHERE t1.unit_key = ?m_param AND t1.signal= ?s_param)

                   SELECT v.snsr_val, v.snsr_ts, v.snsr_dt, v.signal, 
                          v.qual, v.machine 
                   FROM ", my_schema, ".mytable1 AS v
                   INNER JOIN sub
                     ON v.machine = sub.machine
                     AND v.signal = sub.signal
                     AND v.snsr_ts BETWEEN sub.min_snsr_dt AND sub.max_snsr_dt
                   ORDER BY v.snsr_ts")

    # BIND PARAMS TO ?MARK PLACEHOLDERS
    query <- sqlInterpolate(conn, sql, m_param = machine, s_param = signal)

    # RUN QUERY
    dt <- as.data.table(dbGetQuery(myConn, query))

    return(dt)    
}

函数调用

# SINGLE SIGNAL VALUE
q.Aux.final <- signalQuery(myschema = config$SF_CONFIG$my_schema_name1,
                           machine = 984, signal = 70)

# MULTIPLE SIGNAL VALUES
dt_list <- lapply(c(70,71), function(i) 
                    signalQuery(myschema = config$SF_CONFIG$my_schema_name1,
                                machine = 984, signal = i)
           )

q.Aux.final <- data.table::rbindlist(dt_list)

多功能

如果您确实需要第一个结果集来满足分析需求,请在没有 CTE 的情况下继续相同的过程:

auxiliar.dates <- function(my_schema, machine, signal) { 

    sql <- paste0("SELECT t1.machine, t1.signal,  t2.signal_name, 
                          t1.min_snsr_dt, t1.max_snsr_dt,
                          t1.min_snsr_ts, t1.max_snsr_ts, 
                          t1.min_etl_dt,  t1.max_etl_dt, t1.rec_cnt
                   FROM ", my_schema, ".mytable1 AS t1 
                   LEFT JOIN ", my_schema, ".mytable2", "AS t2
                         ON t1.signal=t2.signal 
                   WHERE t1.unit_key = ?m_param AND t1.signal= ?s_param")

    query <- sqlInterpolate(conn, sql, m_param = machine, s_param = signal)
    dt <- as.data.table(dbGetQuery(myConn, query))

    return(dt)    
}


signalQuery <- function(my_schema, machine, signal, min_ts, max_ts) {

    sql <- paste0("SELECT v.snsr_val, v.snsr_ts, v.snsr_dt, v.signal, 
                          v.qual, v.machine 
                   FROM ", my_schema, ".mytable1 AS v
                   WHERE v.machine = ?m_param
                     AND v.signal = ?s_param
                     AND v.snsr_ts BETWEEN ?min_ts_prm AND ?max_ts_prm
                   ORDER BY v.snsr_ts")

    query <- sqlInterpolate(conn, sql, m_param = machine, s_param = signal,
                            min_ts_prm = min_ts, max_ts_prm = max_ts)
    dt <- as.data.table(dbGetQuery(myConn, query))

    return(dt)    
}

函数调用

# SINGLE SIGNAL VALUE
signal71.dates <- auxiliar.dates(config$SF_CONFIG$my_schema_name1, 984, 71)

q.Aux.final <- signalQuery(config$SF_CONFIG$my_schema_name1, 984, 71,
                           signal71.dates$min_snsr_dt, signal71.dates$max_snsr_dt)

# MULTIPLE SIGNAL VALUES
dt_list <- lapply(c(70,71), function(i) 
                    signalQuery(myschema = config$SF_CONFIG$my_schema_name1,
                                machine = 984, signal = i)
           )

signal.dates_dt <- data.table::rbindlist(dt_list)


dt_list <- lapply(1:nrow(signal.dates_dt), function(i) 
                    signalQuery(myschema = config$SF_CONFIG$my_schema_name1,
                                machine  = signal.dates_dt$machine[i], 
                                signal   = signal.dates_dt$signal[i],
                                min_ts   = signal.dates$min_snsr_dt[i],
                                max_ts   = signal.dates$max_snsr_dt[i])
           )

q.Aux.final <- data.table::rbindlist(dt_list)

更新:错误已解决,连接器已过期我需要它重新连接

非常感谢您的解决方案。但是,每当将两个模式用作输入时,我都会收到错误消息。

auxiliar.dates <- function(connection, my_schema1, my_schema2, machine, signal) { 

  sql <- paste0("SELECT t1.machine, t1.signal,  t2.signal_name, 
                          t1.min_snsr_dt, t1.max_snsr_dt,
                          t1.min_snsr_ts, t1.max_snsr_ts, 
                          t1.min_etl_dt,  t1.max_etl_dt, t1.rec_cnt
                   FROM ", my_schema1, ".table1 AS t1 
                   LEFT JOIN ", my_schema2, ".table2", " AS t2
                         ON t1.snsr_key = t2.snsr_key
                   WHERE t1.machine = ?m_param AND t1.signal = ?s_param")

  query <- sqlInterpolate(connection, sql, m_param = machine, s_param = signal)
  dt <- as.data.table(dbGetQuery(connection, query))

  return(dt)    
}`

但是我得到以下错误:

 signal1.dates <- auxiliar.dates(myConn, config$SF_CONFIG$my_schema1, config$SF_CONFIG$my_schema2, machine.number, signal.number)
 Error in (function (classes, fdef, mtable)  : 
  unable to find an inherited method for function ‘sqlInterpolate’ for signature ‘"Snowflake"’ 

你知道为什么会这样吗?当我尝试仅使用一个输入并且未将连接指定为函数的一部分时,它工作正常。