使用 Sparklyr 在 R 中创建 Lazy Spark 读写 Parquet 作业
Create Lazy Spark Read-Write Parquet Job in R using Sparklyr
我想创建一个 spark 作业,该作业从 sql 源(使用 'spark_read_jdbc')读取,然后将结果写入镶木地板文件 ('spark_write_parquet')。
此操作需要多次执行 sql 语句中的小增量。我希望懒惰地创建工作,以便多个执行者可以接手工作。这是一些示例玩具代码:
sql = "SELECT * FROM TBL_%s"
for(i in seq(1,10)){
sql_to_read = sprintf(sql, i)
optionSet$dbtable = paste0("(", sql_to_read ,") foo")
TEMP = spark_read_jdbc(sc, "TEMP", options = optionSet, memory = FALSE)
TEMP = TEMP %>% mutate(id := i)
TEMP %>% spark_write_parquet(., path = "/TEMP.parquet", mode = "append", partition_by = id)
}
问题是写操作不能懒惰地强制执行串行操作。有没有办法编写这段代码,以便为完整操作创建一个 spark 作业,然后当我启动 'collect' 语句时,多个执行程序将执行该操作?
您总是可以使用您的代码结构来完全避免该问题。因为所有写入都使用相同的输出 table 您可以将输入定义为单个联合:
xs <- 1:10
query <- glue::glue("SELECT {xs} AS _id, * FROM TBL_{xs}") %>%
glue::collapse(" UNION ALL \n")
然后
optionSet$dbtable <- glue::glue("({query}) tmp")
spark_read_jdbc(sc, "TEMP", options = optionSet, memory = FALSE) %>%
spark_write_parquet(., path = "/TEMP.parquet", mode = "append", partition_by = id)
您还可以使用 id
作为 partitionColumn
和 min(xs)
/ max(xs)
作为 lowerBound
/ upperBound
到 parallelize reads.
我想创建一个 spark 作业,该作业从 sql 源(使用 'spark_read_jdbc')读取,然后将结果写入镶木地板文件 ('spark_write_parquet')。
此操作需要多次执行 sql 语句中的小增量。我希望懒惰地创建工作,以便多个执行者可以接手工作。这是一些示例玩具代码:
sql = "SELECT * FROM TBL_%s"
for(i in seq(1,10)){
sql_to_read = sprintf(sql, i)
optionSet$dbtable = paste0("(", sql_to_read ,") foo")
TEMP = spark_read_jdbc(sc, "TEMP", options = optionSet, memory = FALSE)
TEMP = TEMP %>% mutate(id := i)
TEMP %>% spark_write_parquet(., path = "/TEMP.parquet", mode = "append", partition_by = id)
}
问题是写操作不能懒惰地强制执行串行操作。有没有办法编写这段代码,以便为完整操作创建一个 spark 作业,然后当我启动 'collect' 语句时,多个执行程序将执行该操作?
您总是可以使用您的代码结构来完全避免该问题。因为所有写入都使用相同的输出 table 您可以将输入定义为单个联合:
xs <- 1:10
query <- glue::glue("SELECT {xs} AS _id, * FROM TBL_{xs}") %>%
glue::collapse(" UNION ALL \n")
然后
optionSet$dbtable <- glue::glue("({query}) tmp")
spark_read_jdbc(sc, "TEMP", options = optionSet, memory = FALSE) %>%
spark_write_parquet(., path = "/TEMP.parquet", mode = "append", partition_by = id)
您还可以使用 id
作为 partitionColumn
和 min(xs)
/ max(xs)
作为 lowerBound
/ upperBound
到 parallelize reads.