Spark:read.jdbc(..numPartitions..) 和 repartition(..numPartitions..) 中的 numPartitions 之间的区别

Spark: Difference between numPartitions in read.jdbc(..numPartitions..) and repartition(..numPartitions..)

我对以下方法中 numPartitions 参数的行为感到困惑:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

DataFrameReader.jdbcofficial docs关于numPartitions参数说如下

numPartitions: the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly.

Dataset.repartitionofficial docs

Returns a new Dataset that has exactly numPartitions partitions.


我目前的理解:

  1. DataFrameReader.jdbc方法中的numPartition参数控制从数据库
  2. 读取数据的并行度
  3. Dataset.repartition 中的 numPartition 参数控制 输出文件的数量 将在 DataFrame 写入磁盘时生成

我的问题:

  1. 如果我通过 DataFrameReader.jdbc 读取 DataFrame,然后将其写入磁盘(不调用 repartition 方法),那么输出中的文件数量是否仍会一样多'在调用 repartition 之后我是否将 DataFrame 写入磁盘?
  2. 如果以上问题的答案是:
    • 是:那么在使用 DataFrameReader.jdbc 方法(使用 numPartitions 参数)读取的 DataFrame 上调用 repartition 方法是否多余?
    • 否:那请指正我理解中的错误。同样在那种情况下, DataFrameReader.jdbc 方法的 numPartitions 参数不应该被称为 'parallelism'?

简短回答:两种方法中 numPartitions 参数的行为(几乎)没有区别


read.jdbc(..numPartitions..)

这里,numPartitions参数控制:

  1. MySQL(或任何其他RDBM读取数据的并行连接数 =92=] 变成 DataFrame.
  2. 所有后续读取操作的并行度DataFrame包括写入磁盘直到repartition方法被调用

repartition(..numPartitions..)

此处 numPartitions 参数控制 并行度 将在 执行 [=15] 的任何操作 时显示=],包括写入磁盘


所以基本上使用 spark.read.jdbc(..numPartitions..) 方法读取 MySQL table 获得的 DataFrame 表现相同(表现出相同的 并行度 在对它执行的操作中)就好像它是 read 没有 parallelism 并且之后调用了 repartition(..numPartitions..) 方法(显然具有相同的值 numPartitions)


要回答确切的问题:

If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?

假设 read 任务已通过提供适当的参数(columnNamelowerBound并行化 upperBound & numPartitions), all 对生成的 DataFrame 包括写入 的操作将并行执行.此处引用 official docs

numPartitions: The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.


Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?

除非您调用 repartition 方法的其他变体(采用 columnExprs 参数的方法),否则在这样的 DataFrame 上调用 repartition(具有相同的 numPartitions) 参数是多余的。但是,我不确定在 already-parallelized DataFrame 上强制执行相同的 并行度 是否也会调用 不必要地在 executors 中混洗 数据。一旦我遇到它会更新答案。