Spark 如何跟踪 randomSplit 中的拆分?
How does Spark keep track of the splits in randomSplit?
这个问题解释了 Spark 的随机拆分是如何工作的,,但我不明白 spark 如何跟踪哪些值进入了一次拆分,以便这些相同的值不会进入第二次拆分.
如果我们看一下 randomSplit 的实现:
def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = {
// It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
// constituent partitions each time a split is materialized which could result in
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic.
val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
}.toArray
}
我们可以看到它创建了两个 DataFrame,它们共享相同的 sqlContext 和两个不同的 Sample(rs)。
这两个 DataFrame 如何相互通信,以便第一个中的值不包含在第二个中?
并且数据被提取了两次? (假设 sqlContext select 来自数据库,select 是否被执行了两次?)。
这与对 RDD 进行采样完全相同。
假设你有权重数组(0.6, 0.2, 0.2)
,Spark将为每个范围(0.0, 0.6), (0.6, 0.8), (0.8, 1.0)
生成一个DataFrame。
当需要读取结果 DataFrame 时,Spark 将遍历父 DataFrame。对于每个项目,生成一个随机数,如果该数字落在指定范围内,则发出该项目。所有子 DataFrame 共享相同的随机数生成器(技术上,具有相同种子的不同生成器),因此随机数的序列是确定性的。
关于你的最后一个问题,如果你没有缓存父DataFrame,那么每次计算输出DataFrame时都会重新获取输入DataFrame的数据。
这个问题解释了 Spark 的随机拆分是如何工作的,
如果我们看一下 randomSplit 的实现:
def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = {
// It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
// constituent partitions each time a split is materialized which could result in
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic.
val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
}.toArray
}
我们可以看到它创建了两个 DataFrame,它们共享相同的 sqlContext 和两个不同的 Sample(rs)。
这两个 DataFrame 如何相互通信,以便第一个中的值不包含在第二个中?
并且数据被提取了两次? (假设 sqlContext select 来自数据库,select 是否被执行了两次?)。
这与对 RDD 进行采样完全相同。
假设你有权重数组(0.6, 0.2, 0.2)
,Spark将为每个范围(0.0, 0.6), (0.6, 0.8), (0.8, 1.0)
生成一个DataFrame。
当需要读取结果 DataFrame 时,Spark 将遍历父 DataFrame。对于每个项目,生成一个随机数,如果该数字落在指定范围内,则发出该项目。所有子 DataFrame 共享相同的随机数生成器(技术上,具有相同种子的不同生成器),因此随机数的序列是确定性的。
关于你的最后一个问题,如果你没有缓存父DataFrame,那么每次计算输出DataFrame时都会重新获取输入DataFrame的数据。