如何在 sparklyr 中重新分区数据框
How to repartition a data frame in sparklyr
由于某种原因,这很难找到。我很容易在pyspark
和sparkr
中找到repartition
函数,但在sparklyr中似乎不存在这样的函数。
有谁知道如何在 sparklyr
中对 Spark 数据帧进行重新分区?
你可以试试这样的
library(dplyr)
library(stringi)
#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
# Create output name
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")
# Convert to Spark DataFrame
sdf <- df %>% spark_dataframe
# Convert names to Columns
exprs <- lapply(
list(...),
function(x) invoke(sdf, "apply", x)
)
sdf %>%
invoke("repartition", as.integer(numPartitions), exprs) %>%
# Use "registerTempTable" with Spark 1.x
invoke("createOrReplaceTempView", alias)
tbl(sc, alias)
}
用法示例:
df <- copy_to(sc, iris)
repartition(df, 3, "Species") %>% optimizedPlan
## <jobj[182]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
optimizedPlan
中定义的函数
现在您可以使用sdf_repartition()
,例如
iris_tbl %>%
sdf_repartition(5L, columns = c("Species", "Petal_Width")) %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
# <jobj[139]>
# class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
# RepartitionByExpression [Species#14, Petal_Width#13], 5
# +- InMemoryRelation [Sepal_Length#10, Sepal_Width#11, Petal_Length#12, Petal_Width#13, Species#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
# +- *FileScan csv [Sepal_Length#10,Sepal_Width#11,Petal_Length#12,Petal_Width#13,Species#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/var/folders/ry/_l__tbl57d940bk2kgj8q2nj3s_d9b/T/Rtmpjgtnl6/spark_serializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
由于某种原因,这很难找到。我很容易在pyspark
和sparkr
中找到repartition
函数,但在sparklyr中似乎不存在这样的函数。
有谁知道如何在 sparklyr
中对 Spark 数据帧进行重新分区?
你可以试试这样的
library(dplyr)
library(stringi)
#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
# Create output name
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")
# Convert to Spark DataFrame
sdf <- df %>% spark_dataframe
# Convert names to Columns
exprs <- lapply(
list(...),
function(x) invoke(sdf, "apply", x)
)
sdf %>%
invoke("repartition", as.integer(numPartitions), exprs) %>%
# Use "registerTempTable" with Spark 1.x
invoke("createOrReplaceTempView", alias)
tbl(sc, alias)
}
用法示例:
df <- copy_to(sc, iris)
repartition(df, 3, "Species") %>% optimizedPlan
## <jobj[182]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
optimizedPlan
中定义的函数
现在您可以使用sdf_repartition()
,例如
iris_tbl %>%
sdf_repartition(5L, columns = c("Species", "Petal_Width")) %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
# <jobj[139]>
# class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
# RepartitionByExpression [Species#14, Petal_Width#13], 5
# +- InMemoryRelation [Sepal_Length#10, Sepal_Width#11, Petal_Length#12, Petal_Width#13, Species#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
# +- *FileScan csv [Sepal_Length#10,Sepal_Width#11,Petal_Length#12,Petal_Width#13,Species#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/var/folders/ry/_l__tbl57d940bk2kgj8q2nj3s_d9b/T/Rtmpjgtnl6/spark_serializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>