如何在 spark 2.3.1 api 中使用累加器
How to use accumulators with spark 2.3.1 api
我正在使用 spark-sql_2.11-2.3.1 版本的 Cassandra 3.x。
我需要提供具有
的验证功能
column_family_name text,
oracle_count bigint,
cassandra_count bigint,
create_timestamp timestamp,
last_update_timestamp timestamp,
update_user text
同样,我需要计算成功插入的记录数,即要填充的 cassandra_count,为此我想使用 spark 累加器。但不幸的是,我无法找到 spark-sql_2.11-2.3.1 版本所需的 API 样本。
下面是我保存到 cassandra 的片段
o_model_df.write.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
.mode(SaveMode.Append)
.save()
这里是如何为成功保存到 Cassandra 中的每一行实现累加器增量 ...
如有帮助,将不胜感激。
Spark 的累加器通常用于您编写的转换,不要指望 spark cassandra 连接器会为您提供类似的东西。
但总的来说 - 如果您的作业没有错误地完成,则意味着数据已正确写入数据库。
如果你想检查数据库中实际有多少行,那么你需要统计数据库中的数据——你可以使用spark cassandra连接器的cassandraCount方法。这样做的主要原因 - 您的 DataFrame 中可能有多行可以映射到单个 Cassandra 行(例如,如果您错误地定义了主键,那么多行都有它)。
我正在使用 spark-sql_2.11-2.3.1 版本的 Cassandra 3.x。 我需要提供具有
的验证功能 column_family_name text,
oracle_count bigint,
cassandra_count bigint,
create_timestamp timestamp,
last_update_timestamp timestamp,
update_user text
同样,我需要计算成功插入的记录数,即要填充的 cassandra_count,为此我想使用 spark 累加器。但不幸的是,我无法找到 spark-sql_2.11-2.3.1 版本所需的 API 样本。
下面是我保存到 cassandra 的片段
o_model_df.write.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
.mode(SaveMode.Append)
.save()
这里是如何为成功保存到 Cassandra 中的每一行实现累加器增量 ...
如有帮助,将不胜感激。
Spark 的累加器通常用于您编写的转换,不要指望 spark cassandra 连接器会为您提供类似的东西。
但总的来说 - 如果您的作业没有错误地完成,则意味着数据已正确写入数据库。
如果你想检查数据库中实际有多少行,那么你需要统计数据库中的数据——你可以使用spark cassandra连接器的cassandraCount方法。这样做的主要原因 - 您的 DataFrame 中可能有多行可以映射到单个 Cassandra 行(例如,如果您错误地定义了主键,那么多行都有它)。