DataFrame 化的 zipWithIndex
DataFrame-ified zipWithIndex
我正在尝试解决向数据集添加序列号这一由来已久的问题。我正在使用 DataFrame,但似乎没有等效于 RDD.zipWithIndex
的 DataFrame。另一方面,以下内容或多或少地按照我想要的方式工作:
val origDF = sqlContext.load(...)
val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
在我的实际应用程序中,origDF 不会直接从文件中加载 -- 它将通过将 2-3 个其他 DataFrame 连接在一起来创建,并将包含超过 1 亿行。
有更好的方法吗?我可以做些什么来优化它?
以下内容是代表 David Griffin 发布的(有问题的编辑)。
能歌善舞的dfZipWithIndex方法。可以设置起始偏移量(默认为1),索引列名(默认为"id"),将列放在前面或后面:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}
从 Spark 1.5 开始,Window
表达式被添加到 Spark。您现在可以使用 org.apache.spark.sql.expressions.row_number
而不是必须将 DataFrame
转换为 RDD
。请注意,我发现上述 dfZipWithIndex
的性能明显快于以下算法。但我发布它是因为:
- 其他人会被诱惑尝试这个
- 也许有人可以优化下面的表达式
无论如何,这对我有用:
import org.apache.spark.sql.expressions._
df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
请注意,我使用 lit(1)
进行分区和排序——这使得所有内容都在同一个分区中,并且似乎保留了 DataFrame
的原始顺序,但我想这就是它减慢速度的原因。
我在一个有 7,000,000 行的 4 列 DataFrame
上测试了它,这个和上面的 dfZipWithIndex
之间的速度差异很大(就像我说的,RDD
函数是快得多)。
PySpark 版本:
from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
return spark.createDataFrame(new_rdd, new_schema)
还创建了一个 jira 以在 Spark 中本地添加此功能:https://issues.apache.org/jira/browse/SPARK-23074
从 Spark 1.6 开始,有一个名为 monotonically_increasing_id()
的函数
它为每一行生成一个具有唯一 64 位单调索引的新列
但这并不重要,每个分区都开始一个新的范围,所以我们必须在使用之前计算每个分区偏移量。
试图提供一个“无rdd”的解决方案,我最终得到了一些collect(),但它只收集偏移量,每个分区一个值,所以它不会导致OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())
val partitionOffsets = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
.orderBy("partition_id")
.select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
.collect()
.map(_.getLong(0))
.toArray
dfWithPartitionId
.withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id", "partition_offset", "inc_id")
}
此解决方案不重新打包原始行,也不对原始巨大的数据帧进行重新分区,因此在现实世界中速度相当快:
200GB 的 CSV 数据(4300 万行,150 列)在 240 个内核上在 2 分钟内读取、索引和打包到 parquet
测试我的解决方案后,我 运行 慢了 20 秒
您可能想要或不想使用 dfWithPartitionId.cache()
,取决于任务
@Evgeny, 很有趣。请注意,当您有空分区时会出现错误(数组缺少这些分区索引,至少我使用 spark 1.6 时会发生这种情况),因此我将数组转换为 Map(partitionId -> offsets)。
此外,我取出了 monotonically_increasing_id 的来源,使每个分区中的 "inc_id" 从 0 开始。
这是一个更新版本:
import org.apache.spark.sql.catalyst.expressions.LeafExpression
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.catalyst.expressions.Nondeterministic
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.expressions.Window
case class PartitionMonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* From org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
*
* Record ID within each partition. By being transient, count's value is reset to 0 every time
* we serialize and deserialize and initialize it.
*/
@transient private[this] var count: Long = _
override protected def initInternal(): Unit = {
count = 1L // notice this starts at 1, not 0 as in org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
}
override def nullable: Boolean = false
override def dataType: DataType = LongType
override protected def evalInternal(input: InternalRow): Long = {
val currentCount = count
count += 1
currentCount
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val countTerm = ctx.freshName("count")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 1L;")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.value} = $countTerm;
$countTerm++;
"""
}
}
object DataframeUtils {
def zipWithIndex(df: DataFrame, offset: Long = 0, indexName: String = "index") = {
// from
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", new Column(PartitionMonotonicallyIncreasingID()))
// collect each partition size, create the offset pages
val partitionOffsets: Map[Int, Long] = dfWithPartitionId
.groupBy("partition_id")
.agg(max("inc_id") as "cnt") // in each partition, count(inc_id) is equal to max(inc_id) (I don't know which one would be faster)
.select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") + lit(offset) as "cnt")
.collect()
.map(r => (r.getInt(0) -> r.getLong(1)))
.toMap
def partition_offset(partitionId: Int): Long = partitionOffsets(partitionId)
val partition_offset_udf = udf(partition_offset _)
// and re-number the index
dfWithPartitionId
.withColumn("partition_offset", partition_offset_udf(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id")
.drop("partition_offset")
.drop("inc_id")
}
}
Spark Java API 版本:
我已经实现了@Evgeny 的 来对 Java 中的 DataFrames 执行 zipWithIndex 并想分享代码。
它还包含@fylb 在他的 . I can confirm for Spark 2.4 that the execution fails when the entries returned by spark_partition_id() do not start with 0 or do not increase sequentially. As this function is documented 中提供的改进是非确定性的,很可能会出现上述情况之一。一个示例是通过增加分区计数来触发的。
java实现如下:
public static Dataset<Row> zipWithIndex(Dataset<Row> df, Long offset, String indexName) {
Dataset<Row> dfWithPartitionId = df
.withColumn("partition_id", spark_partition_id())
.withColumn("inc_id", monotonically_increasing_id());
Object partitionOffsetsObject = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)).alias("cnt"), first("inc_id").alias("inc_id"))
.orderBy("partition_id")
.select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")).minus(col("cnt")).minus(col("inc_id")).plus(lit(offset).alias("cnt")))
.collect();
Row[] partitionOffsetsArray = ((Row[]) partitionOffsetsObject);
Map<Integer, Long> partitionOffsets = new HashMap<>();
for (int i = 0; i < partitionOffsetsArray.length; i++) {
partitionOffsets.put(partitionOffsetsArray[i].getInt(0), partitionOffsetsArray[i].getLong(1));
}
UserDefinedFunction getPartitionOffset = udf(
(partitionId) -> partitionOffsets.get((Integer) partitionId), DataTypes.LongType
);
return dfWithPartitionId
.withColumn("partition_offset", getPartitionOffset.apply(col("partition_id")))
.withColumn(indexName, col("partition_offset").plus(col("inc_id")))
.drop("partition_id", "partition_offset", "inc_id");
}
我已经在 Python 3.7 上将@Tagar 的版本修改为 运行,想要分享:
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0]))) # use this for python 3+, tuple gets passed as single argument so using args and [] notation to read elements within args
return spark.createDataFrame(new_rdd, new_schema)
这是我的建议,优点是:
- 不涉及我们
DataFrame
的[=13]serialization/deserialization[1] =]s.
- 它的逻辑是极简主义的,只依赖于
RDD.zipWithIndex
。
它的主要缺点是:
- 无法直接从非 JVM API(pySpark、SparkR)使用它。
- 它必须在
package org.apache.spark.sql;
. 下
进口:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.functions.lit
/**
* Optimized Spark SQL equivalent of RDD.zipWithIndex.
*
* @param df
* @param indexColName
* @return `df` with a column named `indexColName` of consecutive unique ids.
*/
def zipWithIndex(df: DataFrame, indexColName: String = "index"): DataFrame = {
import df.sparkSession.implicits._
val dfWithIndexCol: DataFrame = df
.drop(indexColName)
.select(lit(0L).as(indexColName), $"*")
val internalRows: RDD[InternalRow] = dfWithIndexCol
.queryExecution
.toRdd
.zipWithIndex()
.map {
case (internalRow: InternalRow, index: Long) =>
internalRow.setLong(0, index)
internalRow
}
Dataset.ofRows(
df.sparkSession,
LogicalRDD(dfWithIndexCol.schema.toAttributes, internalRows)(df.sparkSession)
)
[1]: (from/to InternalRow
的底层字节数组<--> GenericRow
的底层JVM对象集合Array[Any]
).
我正在尝试解决向数据集添加序列号这一由来已久的问题。我正在使用 DataFrame,但似乎没有等效于 RDD.zipWithIndex
的 DataFrame。另一方面,以下内容或多或少地按照我想要的方式工作:
val origDF = sqlContext.load(...)
val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
在我的实际应用程序中,origDF 不会直接从文件中加载 -- 它将通过将 2-3 个其他 DataFrame 连接在一起来创建,并将包含超过 1 亿行。
有更好的方法吗?我可以做些什么来优化它?
以下内容是代表 David Griffin 发布的(有问题的编辑)。
能歌善舞的dfZipWithIndex方法。可以设置起始偏移量(默认为1),索引列名(默认为"id"),将列放在前面或后面:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}
从 Spark 1.5 开始,Window
表达式被添加到 Spark。您现在可以使用 org.apache.spark.sql.expressions.row_number
而不是必须将 DataFrame
转换为 RDD
。请注意,我发现上述 dfZipWithIndex
的性能明显快于以下算法。但我发布它是因为:
- 其他人会被诱惑尝试这个
- 也许有人可以优化下面的表达式
无论如何,这对我有用:
import org.apache.spark.sql.expressions._
df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
请注意,我使用 lit(1)
进行分区和排序——这使得所有内容都在同一个分区中,并且似乎保留了 DataFrame
的原始顺序,但我想这就是它减慢速度的原因。
我在一个有 7,000,000 行的 4 列 DataFrame
上测试了它,这个和上面的 dfZipWithIndex
之间的速度差异很大(就像我说的,RDD
函数是快得多)。
PySpark 版本:
from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
return spark.createDataFrame(new_rdd, new_schema)
还创建了一个 jira 以在 Spark 中本地添加此功能:https://issues.apache.org/jira/browse/SPARK-23074
从 Spark 1.6 开始,有一个名为 monotonically_increasing_id()
的函数
它为每一行生成一个具有唯一 64 位单调索引的新列
但这并不重要,每个分区都开始一个新的范围,所以我们必须在使用之前计算每个分区偏移量。
试图提供一个“无rdd”的解决方案,我最终得到了一些collect(),但它只收集偏移量,每个分区一个值,所以它不会导致OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())
val partitionOffsets = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
.orderBy("partition_id")
.select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
.collect()
.map(_.getLong(0))
.toArray
dfWithPartitionId
.withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id", "partition_offset", "inc_id")
}
此解决方案不重新打包原始行,也不对原始巨大的数据帧进行重新分区,因此在现实世界中速度相当快:
200GB 的 CSV 数据(4300 万行,150 列)在 240 个内核上在 2 分钟内读取、索引和打包到 parquet
测试我的解决方案后,我 运行
您可能想要或不想使用 dfWithPartitionId.cache()
,取决于任务
@Evgeny,
此外,我取出了 monotonically_increasing_id 的来源,使每个分区中的 "inc_id" 从 0 开始。
这是一个更新版本:
import org.apache.spark.sql.catalyst.expressions.LeafExpression
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.catalyst.expressions.Nondeterministic
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.expressions.Window
case class PartitionMonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* From org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
*
* Record ID within each partition. By being transient, count's value is reset to 0 every time
* we serialize and deserialize and initialize it.
*/
@transient private[this] var count: Long = _
override protected def initInternal(): Unit = {
count = 1L // notice this starts at 1, not 0 as in org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
}
override def nullable: Boolean = false
override def dataType: DataType = LongType
override protected def evalInternal(input: InternalRow): Long = {
val currentCount = count
count += 1
currentCount
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val countTerm = ctx.freshName("count")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 1L;")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.value} = $countTerm;
$countTerm++;
"""
}
}
object DataframeUtils {
def zipWithIndex(df: DataFrame, offset: Long = 0, indexName: String = "index") = {
// from
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", new Column(PartitionMonotonicallyIncreasingID()))
// collect each partition size, create the offset pages
val partitionOffsets: Map[Int, Long] = dfWithPartitionId
.groupBy("partition_id")
.agg(max("inc_id") as "cnt") // in each partition, count(inc_id) is equal to max(inc_id) (I don't know which one would be faster)
.select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") + lit(offset) as "cnt")
.collect()
.map(r => (r.getInt(0) -> r.getLong(1)))
.toMap
def partition_offset(partitionId: Int): Long = partitionOffsets(partitionId)
val partition_offset_udf = udf(partition_offset _)
// and re-number the index
dfWithPartitionId
.withColumn("partition_offset", partition_offset_udf(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id")
.drop("partition_offset")
.drop("inc_id")
}
}
Spark Java API 版本:
我已经实现了@Evgeny 的
它还包含@fylb 在他的
java实现如下:
public static Dataset<Row> zipWithIndex(Dataset<Row> df, Long offset, String indexName) {
Dataset<Row> dfWithPartitionId = df
.withColumn("partition_id", spark_partition_id())
.withColumn("inc_id", monotonically_increasing_id());
Object partitionOffsetsObject = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)).alias("cnt"), first("inc_id").alias("inc_id"))
.orderBy("partition_id")
.select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")).minus(col("cnt")).minus(col("inc_id")).plus(lit(offset).alias("cnt")))
.collect();
Row[] partitionOffsetsArray = ((Row[]) partitionOffsetsObject);
Map<Integer, Long> partitionOffsets = new HashMap<>();
for (int i = 0; i < partitionOffsetsArray.length; i++) {
partitionOffsets.put(partitionOffsetsArray[i].getInt(0), partitionOffsetsArray[i].getLong(1));
}
UserDefinedFunction getPartitionOffset = udf(
(partitionId) -> partitionOffsets.get((Integer) partitionId), DataTypes.LongType
);
return dfWithPartitionId
.withColumn("partition_offset", getPartitionOffset.apply(col("partition_id")))
.withColumn(indexName, col("partition_offset").plus(col("inc_id")))
.drop("partition_id", "partition_offset", "inc_id");
}
我已经在 Python 3.7 上将@Tagar 的版本修改为 运行,想要分享:
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0]))) # use this for python 3+, tuple gets passed as single argument so using args and [] notation to read elements within args
return spark.createDataFrame(new_rdd, new_schema)
这是我的建议,优点是:
- 不涉及我们
DataFrame
的[=13]serialization/deserialization[1] =]s. - 它的逻辑是极简主义的,只依赖于
RDD.zipWithIndex
。
它的主要缺点是:
- 无法直接从非 JVM API(pySpark、SparkR)使用它。
- 它必须在
package org.apache.spark.sql;
. 下
进口:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.functions.lit
/**
* Optimized Spark SQL equivalent of RDD.zipWithIndex.
*
* @param df
* @param indexColName
* @return `df` with a column named `indexColName` of consecutive unique ids.
*/
def zipWithIndex(df: DataFrame, indexColName: String = "index"): DataFrame = {
import df.sparkSession.implicits._
val dfWithIndexCol: DataFrame = df
.drop(indexColName)
.select(lit(0L).as(indexColName), $"*")
val internalRows: RDD[InternalRow] = dfWithIndexCol
.queryExecution
.toRdd
.zipWithIndex()
.map {
case (internalRow: InternalRow, index: Long) =>
internalRow.setLong(0, index)
internalRow
}
Dataset.ofRows(
df.sparkSession,
LogicalRDD(dfWithIndexCol.schema.toAttributes, internalRows)(df.sparkSession)
)
[1]: (from/to InternalRow
的底层字节数组<--> GenericRow
的底层JVM对象集合Array[Any]
).