R Spark 一次从文件夹中读取一个文件,与 Shiny 集成
R Spark read one file at a time from folder, integrate with Shiny
我在 HDFS 上有一个文件夹,其中包含 10 个 CSV 文件。每个 CSV 文件包含 10000 行和 17 列。
Objective
响应式读取 HDFS 上的文件夹。
如果文件夹中包含文件,则从文件夹中一次读取一个文件(从旧到新)。
在 Shiny 中绘制一些参数。
当新文件添加到文件夹或从文件夹中读取时更新绘图。
状态 目前,使用 SparklyR,我能够一次反应性地读取所有文件并生成包含 100000 个点 (ggplot) 的图。如果我在启动应用程序后添加第 11 个文件(包含 10000 行),绘图将更新为 110000 点。
library(sparklyr)
conf = spark_config()
conf$spark.driver.memory="50g"
sc <- spark_connect(master = "local[*]", config = conf)
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/")
ui <- function(){
plotOutput("plot")
}
server <- function(input, output, session){
ps <- reactiveSpark(read_folder, intervalMillis = 10)
output$plot <- renderPlot({
df2 = ps()
# str(df2)
ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw()
})
}
shinyApp(ui, server)
SessionInfo()
# R version 3.5.1 (2018-07-02)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows Server >= 2012 x64 (build 9200)
#
# Matrix products: default
#
# locale:
# [1] LC_COLLATE=English_United States.1252 LC_CTYPE=English_United States.1252
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C
# [5] LC_TIME=English_United States.1252
#
# attached base packages:
# [1] stats graphics grDevices utils datasets methods base
#
# other attached packages:
# [1] shinyFiles_0.7.2 bindrcpp_0.2.2 dplyr_0.7.8 shiny_1.2.0 ggplot2_3.1.0
# [6] future_1.10.0 sparklyr_0.9.3.9000
#
# loaded via a namespace (and not attached):
# [1] tidyselect_0.2.5 forge_0.1.9002 purrr_0.2.5 listenv_0.7.0 lattice_0.20-38 colorspace_1.3-2
# [7] generics_0.0.2 htmltools_0.3.6 yaml_2.2.0 base64enc_0.1-3 rlang_0.3.0.1 later_0.7.5
# [13] pillar_1.3.0 glue_1.3.0 withr_2.1.2 DBI_1.0.0 dbplyr_1.2.2 bindr_0.1.1
# [19] plyr_1.8.4 munsell_0.5.0 gtable_0.2.0 htmlwidgets_1.3 codetools_0.2-15 labeling_0.3
# [25] httpuv_1.4.5 parallel_3.5.1 broom_0.5.1 r2d3_0.2.2 Rcpp_1.0.0 xtable_1.8-3
# [31] openssl_1.1 promises_1.0.1 backports_1.1.2 scales_1.0.0 jsonlite_1.6 config_0.3
# [37] fs_1.2.6 mime_0.6 digest_0.6.18 grid_3.5.1 rprojroot_1.3-2 tools_3.5.1
# [43] magrittr_1.5 lazyeval_0.2.1 tibble_1.4.2 crayon_1.3.4 tidyr_0.8.2 pkgconfig_2.0.2
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0 rstudioapi_0.8 R6_2.3.0 globals_0.12.4
# [55] nlme_3.1-137 compiler_3.5.1
但我真正想要的是响应式地一次读取一个文件并制作一个 ggplot。这类似于 Spark Streaming,但 Spark Streaming(据我了解)将所有文本文件读入单个 RDD。从 Spark 的文档中,Python 中存在一个名为 SparkContext.wholeTextFiles 的函数,它允许您读取包含多个小文本文件的目录,并且 returns 每个它们作为(文件名,内容)对(link). I haven't tested it as I want to keep everything in R right now. I looked into shinyFiles but couldn't find any function that does this (https://github.com/thomasp85/shinyFiles)。
R/Sparklyr有没有类似的东西?我想做的事听起来很傻吗?如果您认为在 R 中有更有效的方法来实现它,我洗耳恭听!
谢谢。
我在我的一个项目中遇到了你的问题。我最终使用的是 reactivePoll 函数来更新我的情节。
所以你有两个选择,要么每 x 秒更新一次绘图,而不知道是否有新文件。在这个例子中 120 秒所以两分钟:
您在应用程序代码的开头初始化累加器 b。
b <- 0
IsThereNew = function(){
b <<- b+1
b
}
ReadHdfsData=function(){ # A function that calculates the underlying value
path <- paste0("/your/path/to/data.json")
df <- sc %>%
spark_read_json("name", path) %>%
collect()
return(df)
}
df <- reactivePoll(120 * 1000, session, IsThereNew, ReadHdfsData)
所以在这种情况下,以一种愚蠢的方式,即使没有新数据,您也每 2 分钟更新一次绘图。
你可以做的另一种方法是列出 hdfs 目录中的文件数,每 x 秒一次,如果列表计数被修改,那么绘图将被更新
因此,您必须定义一个函数 listNumberOfFiles,其中 returns 文件的数量并替换 isThereNew 函数。
在@tricky 的帮助下,我找到了一种方法。下面的完整解决方案。脏但现在有效。
# Get list of current files in HDFS
files <- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
# Extract file names
fileNames <- na.omit(str_extract(files, "(?<=/)[^/]*$"))
# CheckFunc for reactivePoll, checks for changes in fileNames
listFiles <- function(){
files <<- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
fileNames <<- na.omit(str_extract(files, "(?<=/)[^/]*$"))
fileNames
}
# ValueFunc for reactivePoll. Returns a vector of HDFS filepaths
ReadHdfsData=function(){
path <- paste0("hdfs://localhost:9000/nik_ml/", fileNames)
return(path)
}
ui3 <- function(){
plotOutput("plot")
}
server3 <- function(input, output, session){
output$plot <- renderPlot({
allFiles <- reactivePoll(5 * 1000, session, listFiles, ReadHdfsData)
# Find filepaths which are added to HDFS
newFile <<- setdiff(allFiles(), newFile)
# print(newFile)
# Do something with each new file.
# I am plotting currently, but I will end up using it for ML predictions.
for(temp in newFile){
df <- spark_read_csv(sc, "name", temp) %>%
select(Time, outletN2) %>%
collect()
# print(head(df))
p1 <- ggplot(data = df, aes(x=Time, y=outletN2)) +
geom_point() +
ggtitle(paste("File =",temp)) +
theme_bw()
print(p1)
}
})
}
# Initialise newFile to "" before running the app
newFile <- character(0)
shinyApp(ui3, server3)
我在 HDFS 上有一个文件夹,其中包含 10 个 CSV 文件。每个 CSV 文件包含 10000 行和 17 列。
Objective
响应式读取 HDFS 上的文件夹。
如果文件夹中包含文件,则从文件夹中一次读取一个文件(从旧到新)。
在 Shiny 中绘制一些参数。
当新文件添加到文件夹或从文件夹中读取时更新绘图。
状态 目前,使用 SparklyR,我能够一次反应性地读取所有文件并生成包含 100000 个点 (ggplot) 的图。如果我在启动应用程序后添加第 11 个文件(包含 10000 行),绘图将更新为 110000 点。
library(sparklyr)
conf = spark_config()
conf$spark.driver.memory="50g"
sc <- spark_connect(master = "local[*]", config = conf)
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/")
ui <- function(){
plotOutput("plot")
}
server <- function(input, output, session){
ps <- reactiveSpark(read_folder, intervalMillis = 10)
output$plot <- renderPlot({
df2 = ps()
# str(df2)
ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw()
})
}
shinyApp(ui, server)
SessionInfo()
# R version 3.5.1 (2018-07-02)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows Server >= 2012 x64 (build 9200)
#
# Matrix products: default
#
# locale:
# [1] LC_COLLATE=English_United States.1252 LC_CTYPE=English_United States.1252
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C
# [5] LC_TIME=English_United States.1252
#
# attached base packages:
# [1] stats graphics grDevices utils datasets methods base
#
# other attached packages:
# [1] shinyFiles_0.7.2 bindrcpp_0.2.2 dplyr_0.7.8 shiny_1.2.0 ggplot2_3.1.0
# [6] future_1.10.0 sparklyr_0.9.3.9000
#
# loaded via a namespace (and not attached):
# [1] tidyselect_0.2.5 forge_0.1.9002 purrr_0.2.5 listenv_0.7.0 lattice_0.20-38 colorspace_1.3-2
# [7] generics_0.0.2 htmltools_0.3.6 yaml_2.2.0 base64enc_0.1-3 rlang_0.3.0.1 later_0.7.5
# [13] pillar_1.3.0 glue_1.3.0 withr_2.1.2 DBI_1.0.0 dbplyr_1.2.2 bindr_0.1.1
# [19] plyr_1.8.4 munsell_0.5.0 gtable_0.2.0 htmlwidgets_1.3 codetools_0.2-15 labeling_0.3
# [25] httpuv_1.4.5 parallel_3.5.1 broom_0.5.1 r2d3_0.2.2 Rcpp_1.0.0 xtable_1.8-3
# [31] openssl_1.1 promises_1.0.1 backports_1.1.2 scales_1.0.0 jsonlite_1.6 config_0.3
# [37] fs_1.2.6 mime_0.6 digest_0.6.18 grid_3.5.1 rprojroot_1.3-2 tools_3.5.1
# [43] magrittr_1.5 lazyeval_0.2.1 tibble_1.4.2 crayon_1.3.4 tidyr_0.8.2 pkgconfig_2.0.2
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0 rstudioapi_0.8 R6_2.3.0 globals_0.12.4
# [55] nlme_3.1-137 compiler_3.5.1
但我真正想要的是响应式地一次读取一个文件并制作一个 ggplot。这类似于 Spark Streaming,但 Spark Streaming(据我了解)将所有文本文件读入单个 RDD。从 Spark 的文档中,Python 中存在一个名为 SparkContext.wholeTextFiles 的函数,它允许您读取包含多个小文本文件的目录,并且 returns 每个它们作为(文件名,内容)对(link). I haven't tested it as I want to keep everything in R right now. I looked into shinyFiles but couldn't find any function that does this (https://github.com/thomasp85/shinyFiles)。
R/Sparklyr有没有类似的东西?我想做的事听起来很傻吗?如果您认为在 R 中有更有效的方法来实现它,我洗耳恭听!
谢谢。
我在我的一个项目中遇到了你的问题。我最终使用的是 reactivePoll 函数来更新我的情节。
所以你有两个选择,要么每 x 秒更新一次绘图,而不知道是否有新文件。在这个例子中 120 秒所以两分钟: 您在应用程序代码的开头初始化累加器 b。
b <- 0
IsThereNew = function(){
b <<- b+1
b
}
ReadHdfsData=function(){ # A function that calculates the underlying value
path <- paste0("/your/path/to/data.json")
df <- sc %>%
spark_read_json("name", path) %>%
collect()
return(df)
}
df <- reactivePoll(120 * 1000, session, IsThereNew, ReadHdfsData)
所以在这种情况下,以一种愚蠢的方式,即使没有新数据,您也每 2 分钟更新一次绘图。
你可以做的另一种方法是列出 hdfs 目录中的文件数,每 x 秒一次,如果列表计数被修改,那么绘图将被更新 因此,您必须定义一个函数 listNumberOfFiles,其中 returns 文件的数量并替换 isThereNew 函数。
在@tricky 的帮助下,我找到了一种方法。下面的完整解决方案。脏但现在有效。
# Get list of current files in HDFS
files <- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
# Extract file names
fileNames <- na.omit(str_extract(files, "(?<=/)[^/]*$"))
# CheckFunc for reactivePoll, checks for changes in fileNames
listFiles <- function(){
files <<- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
fileNames <<- na.omit(str_extract(files, "(?<=/)[^/]*$"))
fileNames
}
# ValueFunc for reactivePoll. Returns a vector of HDFS filepaths
ReadHdfsData=function(){
path <- paste0("hdfs://localhost:9000/nik_ml/", fileNames)
return(path)
}
ui3 <- function(){
plotOutput("plot")
}
server3 <- function(input, output, session){
output$plot <- renderPlot({
allFiles <- reactivePoll(5 * 1000, session, listFiles, ReadHdfsData)
# Find filepaths which are added to HDFS
newFile <<- setdiff(allFiles(), newFile)
# print(newFile)
# Do something with each new file.
# I am plotting currently, but I will end up using it for ML predictions.
for(temp in newFile){
df <- spark_read_csv(sc, "name", temp) %>%
select(Time, outletN2) %>%
collect()
# print(head(df))
p1 <- ggplot(data = df, aes(x=Time, y=outletN2)) +
geom_point() +
ggtitle(paste("File =",temp)) +
theme_bw()
print(p1)
}
})
}
# Initialise newFile to "" before running the app
newFile <- character(0)
shinyApp(ui3, server3)