火花累加器值不递增
Spark Accumalator value not incrementing
我最近一直在研究 Spark 数据集,我有一个场景 我必须为每一行生成行号并将其存储在名为 "Ids".[=40= 的列中] 此行号从 1、2、3... 开始,并根据数据集中的行数递增。 (在我的例子中有 10000-20000 条记录)
考虑一下,我有一个数据集 'empDataset',其值为:
name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz
现在,对于上面的数据集,我想添加一个列 'Ids',值从 1,2,3.. 递增。
预期的输出是这样的
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3
我还想将此输出存储在另一个数据集中,并进一步将其用于不同的转换。
需要帮助解决这个问题陈述。!!
我的代码片段:
LongAccumulator accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;
spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {
@Override
public Long call(String namCol) throws Exception {
accum.add(rowNumber);
System.out.println("inside" + accum.value());
return accum.value();
}
}, DataTypes.LongType);
Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
col(name)));
Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)
我得到的输出是 empDatasetWithIds(输出不正确):
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1
以上代码在 运行 本地模式下工作正常,但在集群模式下值不增加。
我还浏览了以下链接:
https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html
火花蓄能器需要一个动作来触发作业。在我的场景中,我正在对数据集进一步进行过滤转换,我该如何解决这个问题。需要帮助。
对于此功能,您可以使用 row_number
import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;
Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids",
row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)
参考:
正如评论中指出的那样,使用 Window 没有分区是非常低效的。并且应该避免在生产代码中处理大数据。
您使用累加器的方法不起作用(如 中所述),因为 spark 在不同的执行器中运行此代码(不同的 jvm 进程 运行 在不同的机器上),并且每个执行器都有自己的自己的副本如果累加器。
如果顺序升序不是问题,您可以按照以下简单操作:
import org.apache.spark.sql.functions.monotonically_increasing_id
import spark.implicits._
val ds = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS // Just a dummy DS
val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())
newds.show(false)
尝试一下,适应自己的情况。
顺便说一句:累加器使用错误。
累加器是用于在执行器之间累加数据并将其发送回驱动程序的变量。如果您从执行程序读取它的值,则行为未定义(AFAIK)。我想您可能会得到到目前为止为本地分区积累的内容。事实上,spark 的目标是进行并行计算。因此,当使用累加器时,每个分区的数据都在一个单独的累加器中累加,然后合并并发送回驱动程序(map reduce 范例)。所以你不能使用累加器在执行者之间共享信息。这不是它的意思
然而,你可以做的是,如果你需要连续的索引,则使用 RDD API 中的 zipWithIndex
,或者如果你只是使用 SparkSQL API 中的 monoticallyIncreasingId
需要增加指数。前者触发一个小的 spark 作业,而后者几乎是免费的(没有 spark 作业)。
方案一(递增但不一定连续的指数)
yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());
选项 2(连续递增指数)
StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
.map(x -> {
Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
Long index = x._2;
row.add(index);
return RowFactory.create(row);
});
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);
我最近一直在研究 Spark 数据集,我有一个场景 我必须为每一行生成行号并将其存储在名为 "Ids".[=40= 的列中] 此行号从 1、2、3... 开始,并根据数据集中的行数递增。 (在我的例子中有 10000-20000 条记录)
考虑一下,我有一个数据集 'empDataset',其值为:
name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz
现在,对于上面的数据集,我想添加一个列 'Ids',值从 1,2,3.. 递增。
预期的输出是这样的
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3
我还想将此输出存储在另一个数据集中,并进一步将其用于不同的转换。
需要帮助解决这个问题陈述。!!
我的代码片段:
LongAccumulator accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;
spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {
@Override
public Long call(String namCol) throws Exception {
accum.add(rowNumber);
System.out.println("inside" + accum.value());
return accum.value();
}
}, DataTypes.LongType);
Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
col(name)));
Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)
我得到的输出是 empDatasetWithIds(输出不正确):
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1
以上代码在 运行 本地模式下工作正常,但在集群模式下值不增加。
我还浏览了以下链接:
https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html
火花蓄能器需要一个动作来触发作业。在我的场景中,我正在对数据集进一步进行过滤转换,我该如何解决这个问题。需要帮助。
对于此功能,您可以使用 row_number
import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;
Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids",
row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)
参考:
正如评论中指出的那样,使用 Window 没有分区是非常低效的。并且应该避免在生产代码中处理大数据。
您使用累加器的方法不起作用(如
如果顺序升序不是问题,您可以按照以下简单操作:
import org.apache.spark.sql.functions.monotonically_increasing_id
import spark.implicits._
val ds = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS // Just a dummy DS
val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())
newds.show(false)
尝试一下,适应自己的情况。
顺便说一句:累加器使用错误。
累加器是用于在执行器之间累加数据并将其发送回驱动程序的变量。如果您从执行程序读取它的值,则行为未定义(AFAIK)。我想您可能会得到到目前为止为本地分区积累的内容。事实上,spark 的目标是进行并行计算。因此,当使用累加器时,每个分区的数据都在一个单独的累加器中累加,然后合并并发送回驱动程序(map reduce 范例)。所以你不能使用累加器在执行者之间共享信息。这不是它的意思
然而,你可以做的是,如果你需要连续的索引,则使用 RDD API 中的 zipWithIndex
,或者如果你只是使用 SparkSQL API 中的 monoticallyIncreasingId
需要增加指数。前者触发一个小的 spark 作业,而后者几乎是免费的(没有 spark 作业)。
方案一(递增但不一定连续的指数)
yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());
选项 2(连续递增指数)
StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
.map(x -> {
Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
Long index = x._2;
row.add(index);
return RowFactory.create(row);
});
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);