如何在 sparklyr 中重新分区数据框

How to repartition a data frame in sparklyr

由于某种原因,这很难找到。我很容易在pysparksparkr中找到repartition函数,但在sparklyr中似乎不存在这样的函数。

有谁知道如何在 sparklyr 中对 Spark 数据帧进行重新分区?

你可以试试这样的

library(dplyr)
library(stringi)


#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
  # Create output name
  alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")

  # Convert to Spark DataFrame
  sdf <- df %>% spark_dataframe

  # Convert names to Columns
  exprs <- lapply(
    list(...),
    function(x) invoke(sdf, "apply", x)
  )

  sdf %>% 
    invoke("repartition", as.integer(numPartitions), exprs) %>%
    # Use "registerTempTable" with Spark 1.x
    invoke("createOrReplaceTempView", alias)

  tbl(sc, alias)
}

用法示例:

df <- copy_to(sc, iris)

repartition(df, 3, "Species") %>% optimizedPlan

## <jobj[182]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

optimizedPlan 中定义的函数

现在您可以使用sdf_repartition(),例如

iris_tbl %>%
  sdf_repartition(5L, columns = c("Species", "Petal_Width")) %>%
  spark_dataframe() %>%
  invoke("queryExecution") %>%
  invoke("optimizedPlan") 
# <jobj[139]>
#   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
# RepartitionByExpression [Species#14, Petal_Width#13], 5
#                          +- InMemoryRelation [Sepal_Length#10, Sepal_Width#11, Petal_Length#12, Petal_Width#13, Species#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
#                                               +- *FileScan csv [Sepal_Length#10,Sepal_Width#11,Petal_Length#12,Petal_Width#13,Species#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/var/folders/ry/_l__tbl57d940bk2kgj8q2nj3s_d9b/T/Rtmpjgtnl6/spark_serializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>