如何定义DataFrame的分区?

How to define partitioning of DataFrame?

我已经开始在 Spark 1.4.0 中使用 Spark SQL 和 DataFrames。我想在 Scala 中的 DataFrames 上定义自定义分区程序,但不知道如何执行此操作。

我正在使用的其中一个数据表包含按帐户分类的交易列表,与以下示例类似。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在最初,大部分计算将发生在帐户内的交易之间。所以我想对数据进行分区,以便一个帐户的所有交易都在同一个 Spark 分区中。

但我没有找到定义它的方法。 DataFrame class 有一个名为 'repartition(Int)' 的方法,您可以在其中指定要创建的分区数。但是我没有看到任何可用于为 DataFrame 定义自定义分区程序的方法,例如可以为 RDD 指定的方法。

源数据存储在Parquet中。我确实看到,在将 DataFrame 写入 Parquet 时,您可以指定一个列进行分区,所以大概我可以告诉 Parquet 按 'Account' 列对数据进行分区。但是可能有数百万个帐户,如果我正确理解 Parquet,它会为每个帐户创建一个不同的目录,所以这听起来不是一个合理的解决方案。

有没有办法让 Spark 对这个 DataFrame 进行分区,以便一个帐户的所有数据都在同一个分区中?

使用返回的 DataFrame:

yourDF.orderBy(account)

没有明确的方法可以在 DataFrame 上使用 partitionBy,只能在 PairRDD 上使用,但是当您对 DataFrame 进行排序时,它将在它的 LogicalPlan 中使用它,这将在您需要进行计算时提供帮助在每个帐户上。

我只是偶然发现了同样的问题,我想按帐户对数据框进行分区。 我假设当你说 "want to have the data partitioned so that all of the transactions for an account are in the same Spark partition" 时,你想要它来提高规模和性能,但你的代码并不依赖于它(比如使用 mapPartitions() 等),对吧?

在 Spark < 1.6 中,如果你创建一个 HiveContext,而不是普通的旧 SqlContext,你可以使用 HiveQL DISTRIBUTE BY colX...(确保 N 个减速器中的每一个都得到非-例如 x) 和 CLUSTER BY colX... 的重叠范围(分布依据和排序依据的快捷方式);

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

不确定这如何适合 Spark DF api。正常的 SqlContext 不支持这些关键字(请注意,您不需要具有配置单元元存储即可使用 HiveContext)

编辑: Spark 1.6+ 现在在原生 DataFrame 中有这个 API

所以从某种答案开始:) - 你不能

我不是专家,但据我了解,DataFrames 不等于 rdd,而且 DataFrame 没有 Partitioner 这样的东西。

一般来说,DataFrame 的想法是提供另一个抽象级别来自行处理此类问题。 DataFrame 上的查询被翻译成逻辑计划,再进一步翻译成 RDD 上的操作。您建议的分区可能会自动应用或至少应该自动应用。

如果您不相信 SparkSQL 会提供某种最佳作业,您可以随时按照评论中的建议将 DataFrame 转换为 RDD[Row]。

我能够使用 RDD 做到这一点。但我不知道这是否是您可以接受的解决方案。 将 DF 作为 RDD 提供后,您可以应用 repartitionAndSortWithinPartitions 来执行数据的自定义重新分区。

这是我使用的示例:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)

Spark >= 2.3.0

SPARK-22614 公开范围分区。

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 exposes external format partitioning in the Data Source API v2.

Spark >= 1.6.0

在 Spark >= 1.6 中,可以使用按列分区进行查询和缓存。请参阅:SPARK-11410 and SPARK-4849 使用 repartition 方法:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDs Spark Dataset(包括 Dataset[Row] a.k.a DataFrame)不同,目前无法使用自定义分区程序。您通常可以通过创建一个人工分区列来解决这个问题,但它不会给您同样的灵活性。

Spark < 1.6.0:

您可以做的一件事是在创建 DataFrame

之前对输入数据进行预分区
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

由于 DataFrameRDD 创建只需要一个简单的映射阶段,应保留现有分区布局*:

assert(df.rdd.partitions == partitioned.partitions)

用同样的方法重新分区现有的 DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

这么看来,也不是没有可能。问题仍然存在,它是否有意义。我会争辩说大多数时候它不会:

  1. 重新分区是一个昂贵的过程。在典型的场景中,大多数数据都必须进行序列化、混洗和反序列化。另一方面,可以从预分区数据中获益的操作数量相对较少,如果内部 API 未设计为利用此 属性,则会进一步受到限制。

    • 部分场景加入,但需要内部支持,
    • window 函数调用与匹配的分区程序。同上,仅限于单个 window 定义。虽然它已经在内部分区了,所以预分区可能是多余的,
    • 使用 GROUP BY 的简单聚合 - 可以减少临时缓冲区的内存占用**,但总体成本要高得多。或多或少相当于 groupByKey.mapValues(_.reduce) (当前行为)与 reduceByKey (预分区)。在实践中不太可能有用。
    • 数据压缩SqlContext.cacheTable。由于看起来它正在使用 运行 长度编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions 可以提高压缩率。
  2. 性能在很大程度上取决于密钥的分布。如果它是倾斜的,它将导致资源利用率不理想。在最坏的情况下,根本不可能完成这项工作。

  3. 使用高级声明的全部意义 API 是将您自己与低级实现细节隔离开来。正如 @dwysakowicz and @RomiKuntsman an optimization is a job of the Catalyst Optimizer 已经提到的那样。这是一个非常复杂的野兽,我真的怀疑你是否可以轻松地改进它而无需更深入地研究它的内部结构。

相关概念

使用 JDBC 源进行分区:

JDBC 数据源支持 predicates argument。可以按如下方式使用:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

它为每个谓词创建一个 JDBC 分区。请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果 table.

中看到重复项

partitionBy DataFrameWriter中的方法:

Spark DataFrameWriter 提供了 partitionBy 方法,可用于 "partition" 写入数据。它使用提供的一组列

在写入时分隔数据
val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

这会在基于键的查询的读取上启用谓词下推:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

但不等同于DataFrame.repartition。特别是聚合,如:

val cnts = df1.groupBy($"k").sum()

仍然需要 TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy DataFrameWriter 中的方法(Spark >= 2.0):

bucketBy 具有与 partitionBy 类似的应用程序,但它仅适用于 tables (saveAsTable)。分桶信息可用于优化连接:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 分区布局 我的意思只是数据分布。 partitioned RDD 不再有分区程序。 ** 假设没有早期预测。如果聚合仅涵盖列的一小部分,则可能没有任何好处。