如何在 spark 2.3.1 api 中使用累加器

How to use accumulators with spark 2.3.1 api

我正在使用 spark-sql_2.11-2.3.1 版本的 Cassandra 3.x。 我需要提供具有

的验证功能
   column_family_name text,
    oracle_count bigint,
    cassandra_count bigint,
    create_timestamp timestamp,
    last_update_timestamp timestamp,
    update_user text

同样,我需要计算成功插入的记录数,即要填充的 cassandra_count,为此我想使用 spark 累加器。但不幸的是,我无法找到 spark-sql_2.11-2.3.1 版本所需的 API 样本。

下面是我保存到 cassandra 的片段

 o_model_df.write.format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
    .mode(SaveMode.Append)
    .save()

这里是如何为成功保存到 Cassandra 中的每一行实现累加器增量 ...

如有帮助,将不胜感激。

Spark 的累加器通常用于您编写的转换,不要指望 spark cassandra 连接器会为您提供类似的东西。

但总的来说 - 如果您的作业没有错误地完成,则意味着数据已正确写入数据库。

如果你想检查数据库中实际有多少行,那么你需要统计数据库中的数据——你可以使用spark cassandra连接器的cassandraCount方法。这样做的主要原因 - 您的 DataFrame 中可能有多行可以映射到单个 Cassandra 行(例如,如果您错误地定义了主键,那么多行都有它)。