将 R 中列的最后 N 天覆盖为雪花
Overwrite Last N Days from column in R to Snowflake
出于示例目的,我使用的是 tidyquant 数据集。
install.packages('tidyquant')
library(tidyquant)
library(data.table)
options("getSymbols.warning4.0"=FALSE)
options("getSymbols.yahoo.warning"=FALSE)
# Downloading Apple price using quantmod
first.date <- Sys.Date() - 30
last.date <- Sys.Date()
getSymbols("AAPL", from = first.date,
to = last.date,warnings = FALSE,
auto.assign = TRUE)
AAPL<-data.frame(AAPL)
AAPL<- setDT(AAPL, keep.rownames = TRUE)[]
colnames(AAPL)[1] <- "DATE"
AAPL$RUNDATE<-AAPL$DATE
head(AAPL)
DATE AAPL.Open AAPL.High AAPL.Low AAPL.Close AAPL.Volume AAPL.Adjusted RUNDATE
1: 2021-03-19 119.90 121.43 119.68 119.99 185023200 119.99 2021-03-19
2: 2021-03-22 120.33 123.87 120.26 123.39 111912300 123.39 2021-03-22
3: 2021-03-23 123.33 124.24 122.14 122.54 95467100 122.54 2021-03-23
4: 2021-03-24 122.82 122.90 120.07 120.09 88530500 120.09 2021-03-24
5: 2021-03-25 119.54 121.66 119.00 120.59 98844700 120.59 2021-03-25
6: 2021-03-26 120.35 121.48 118.92 121.21 93958900 121.21 2021-03-26
这是我的目标,我已经想出如何使用以下命令使用 Databricks 中的 R 将此数据集上传到雪花中:
sparkr.sdf <- SparkR::createDataFrame(AAPL)
SparkR::write.df(
df = sparkr.sdf,
source = "snowflake",
mode = "overwrite",
sfUrl = "<snowflake-url>",
sfUser = user,
sfPassword = password,
sfDatabase = "<snowflake-database>",
sfSchema = "<snowflake-schema>",
sfWarehouse = "<snowflake-cluster>",
dbtable = "AAPL_table"
)
但是,我想每天 运行 上传最新日期的拉取(最近 30 天),追加并覆盖 RunDate 列中的最近 30 天。
我的想法是继续使用上面的相同查询,只是将 运行 日期列更改为今天的日期。 AAPL$RUNDATE<-Sys.Date()
每天。这样当我 运行 这个时,我可以从这个新的 AAPL$RUNDATE
中减去 30 天,并且只保留 AAPL$RUNDATE 向前移动的 30 天时间范围内的任何内容。
我看到了这段代码,但没有看到设置回顾限制的选项
DBI::dbSendQuery(snowflake.conn,"use schema schemaname") # strange this is required
dbWriteTable(snowflake.conn, 'tablename', df, append = T, overwrite = F, verbose = T)
或者这个
source:
# Write table
sparklyr.sdf <- copy_to(sc, iris, overwrite = TRUE)
sparklyr.sdf %>%
spark_write_source(
sc = sc,
mode = "overwrite",
source = "snowflake",
options = sf_options
)
我的想法是 运行 这个数据每天提取过去 30 天的数据,而不是整个时间段,并附加到现有数据集。
以前有没有人做过类似的事情,非常感谢对此的任何帮助,如果您需要更清楚地了解我正在努力完成的事情,请告诉我
假设您 运行 您的代码获取新数据并存储在名为 AAPL
的变量中,那么
AAPL$RUNDATE <- Sys.Date()
DBI::dbWriteTable(conn, AAPL, append = TRUE)
从这里开始,我认为您有两个选择:
删除超过 30 天的数据。在将新数据附加到 table 之后,然后 运行
DBI::dbExecute(conn,
"delete from AAPL_table
where DATEDIFF(day, RUNDATE, CURRENT_DATE()) > 30")
(使用 DATEDIFF
and CURRENT_DATE
中的示例形成。)
作为替代方案,如果您今天没有运行清除,您可能更愿意使用RUNDATE
中观察到的最大值table 而不是今天的日期,在这种情况下您可以使用
maxdate <- DBI::dbGetQuery(conn, "select max(RUNDATE) as maxdate from AAPL_table")$maxdate
if (length(maxdate) && !anyNA(maxdate)) {
DBI::dbExecute(conn,
"delete from AAPL_table where DATEDIFF(day, RUNDATE, ?) > 30", params = list(maxdate))
}
保留 table 中的旧数据,每当您从雪花实例中检索它时,只检索最近 30 天的价值:
x <- DBI::dbGetQuery(conn,
"select ... from AAPL_table where RUNDATE >= ?", params = list(Sys.Date()))
(警告:我没有雪花,所以这是未经测试的。我在sql服务器上测试了它的前提没有问题,并验证了至少两个sql函数是支持。)
出于示例目的,我使用的是 tidyquant 数据集。
install.packages('tidyquant')
library(tidyquant)
library(data.table)
options("getSymbols.warning4.0"=FALSE)
options("getSymbols.yahoo.warning"=FALSE)
# Downloading Apple price using quantmod
first.date <- Sys.Date() - 30
last.date <- Sys.Date()
getSymbols("AAPL", from = first.date,
to = last.date,warnings = FALSE,
auto.assign = TRUE)
AAPL<-data.frame(AAPL)
AAPL<- setDT(AAPL, keep.rownames = TRUE)[]
colnames(AAPL)[1] <- "DATE"
AAPL$RUNDATE<-AAPL$DATE
head(AAPL)
DATE AAPL.Open AAPL.High AAPL.Low AAPL.Close AAPL.Volume AAPL.Adjusted RUNDATE
1: 2021-03-19 119.90 121.43 119.68 119.99 185023200 119.99 2021-03-19
2: 2021-03-22 120.33 123.87 120.26 123.39 111912300 123.39 2021-03-22
3: 2021-03-23 123.33 124.24 122.14 122.54 95467100 122.54 2021-03-23
4: 2021-03-24 122.82 122.90 120.07 120.09 88530500 120.09 2021-03-24
5: 2021-03-25 119.54 121.66 119.00 120.59 98844700 120.59 2021-03-25
6: 2021-03-26 120.35 121.48 118.92 121.21 93958900 121.21 2021-03-26
这是我的目标,我已经想出如何使用以下命令使用 Databricks 中的 R 将此数据集上传到雪花中:
sparkr.sdf <- SparkR::createDataFrame(AAPL)
SparkR::write.df(
df = sparkr.sdf,
source = "snowflake",
mode = "overwrite",
sfUrl = "<snowflake-url>",
sfUser = user,
sfPassword = password,
sfDatabase = "<snowflake-database>",
sfSchema = "<snowflake-schema>",
sfWarehouse = "<snowflake-cluster>",
dbtable = "AAPL_table"
)
但是,我想每天 运行 上传最新日期的拉取(最近 30 天),追加并覆盖 RunDate 列中的最近 30 天。
我的想法是继续使用上面的相同查询,只是将 运行 日期列更改为今天的日期。 AAPL$RUNDATE<-Sys.Date()
每天。这样当我 运行 这个时,我可以从这个新的 AAPL$RUNDATE
中减去 30 天,并且只保留 AAPL$RUNDATE 向前移动的 30 天时间范围内的任何内容。
我看到了这段代码,但没有看到设置回顾限制的选项
DBI::dbSendQuery(snowflake.conn,"use schema schemaname") # strange this is required
dbWriteTable(snowflake.conn, 'tablename', df, append = T, overwrite = F, verbose = T)
或者这个 source:
# Write table
sparklyr.sdf <- copy_to(sc, iris, overwrite = TRUE)
sparklyr.sdf %>%
spark_write_source(
sc = sc,
mode = "overwrite",
source = "snowflake",
options = sf_options
)
我的想法是 运行 这个数据每天提取过去 30 天的数据,而不是整个时间段,并附加到现有数据集。
以前有没有人做过类似的事情,非常感谢对此的任何帮助,如果您需要更清楚地了解我正在努力完成的事情,请告诉我
假设您 运行 您的代码获取新数据并存储在名为 AAPL
的变量中,那么
AAPL$RUNDATE <- Sys.Date()
DBI::dbWriteTable(conn, AAPL, append = TRUE)
从这里开始,我认为您有两个选择:
删除超过 30 天的数据。在将新数据附加到 table 之后,然后 运行
DBI::dbExecute(conn, "delete from AAPL_table where DATEDIFF(day, RUNDATE, CURRENT_DATE()) > 30")
(使用
DATEDIFF
andCURRENT_DATE
中的示例形成。)作为替代方案,如果您今天没有运行清除,您可能更愿意使用
RUNDATE
中观察到的最大值table 而不是今天的日期,在这种情况下您可以使用maxdate <- DBI::dbGetQuery(conn, "select max(RUNDATE) as maxdate from AAPL_table")$maxdate if (length(maxdate) && !anyNA(maxdate)) { DBI::dbExecute(conn, "delete from AAPL_table where DATEDIFF(day, RUNDATE, ?) > 30", params = list(maxdate)) }
保留 table 中的旧数据,每当您从雪花实例中检索它时,只检索最近 30 天的价值:
x <- DBI::dbGetQuery(conn, "select ... from AAPL_table where RUNDATE >= ?", params = list(Sys.Date()))
(警告:我没有雪花,所以这是未经测试的。我在sql服务器上测试了它的前提没有问题,并验证了至少两个sql函数是支持。)