火花累加器值不递增

Spark Accumalator value not incrementing

我最近一直在研究 Spark 数据集,我有一个场景 我必须为每一行生成行号并将其存储在名为 "Ids".[=40= 的列中] 此行号从 1、2、3... 开始,并根据数据集中的行数递增。 (在我的例子中有 10000-20000 条记录)

考虑一下,我有一个数据集 'empDataset',其值为:

name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz

现在,对于上面的数据集,我想添加一个列 'Ids',值从 1,2,3.. 递增。

预期的输出是这样的

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3

我还想将此输出存储在另一个数据集中,并进一步将其用于不同的转换。

需要帮助解决这个问题陈述。!!

我的代码片段:

LongAccumulator  accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;

spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {

            @Override
            public Long call(String namCol) throws Exception {
                    accum.add(rowNumber);
                    System.out.println("inside" + accum.value());
                    return accum.value();
                }
        }, DataTypes.LongType);

Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
                col(name)));

Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)

我得到的输出是 empDatasetWithIds(输出不正确):

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1

以上代码在 运行 本地模式下工作正常,但在集群模式下值不增加。

我还浏览了以下链接: https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html

火花蓄能器需要一个动作来触发作业。在我的场景中,我正在对数据集进一步进行过滤转换,我该如何解决这个问题。需要帮助。

对于此功能,您可以使用 row_number

import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;

Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids", 
    row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)

参考:

正如评论中指出的那样,使用 Window 没有分区是非常低效的。并且应该避免在生产代码中处理大数据。

您使用累加器的方法不起作用(如 中所述),因为 spark 在不同的执行器中运行此代码(不同的 jvm 进程 运行 在不同的机器上),并且每个执行器都有自己的自己的副本如果累加器。

如果顺序升序不是问题,您可以按照以下简单操作:

import org.apache.spark.sql.functions.monotonically_increasing_id 
import spark.implicits._

val ds = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS   // Just a dummy DS

val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())

newds.show(false)

尝试一下,适应自己的情况。

顺便说一句:累加器使用错误。

累加器是用于在执行器之间累加数据并将其发送回驱动程序的变量。如果您从执行程序读取它的值,则行为未定义(AFAIK)。我想您可能会得到到目前为止为本地分区积累的内容。事实上,spark 的目标是进行并行计算。因此,当使用累加器时,每个分区的数据都在一个单独的累加器中累加,然后合并并发送回驱动程序(map reduce 范例)。所以你不能使用累加器在执行者之间共享信息。这不是它的意思

然而,你可以做的是,如果你需要连续的索引,则使用 RDD API 中的 zipWithIndex,或者如果你只是使用 SparkSQL API 中的 monoticallyIncreasingId需要增加指数。前者触发一个小的 spark 作业,而后者几乎是免费的(没有 spark 作业)。

方案一(递增但不一定连续的指数)

yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());

选项 2(连续递增指数)

StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
    .map(x -> {
         Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
         Long index = x._2;
         row.add(index);
         return RowFactory.create(row);
    });
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);