Spark Cassandra 连接器未将所有记录添加到数据库
Spark Cassandra Connector not adding all records to DB
我使用的版本号:com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
我有一个来自 kafka 流的 RDD:
kafkaStream.foreachRDD((rdd: RDD[String]) => {
if(rdd.count > 0) {
println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");
sqlContext.read.json(rdd)
.select("count_metadata.tran_id")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "tmp", "keyspace" -> "kspace"))
.mode(SaveMode.Append)
.save();
} else {
println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
}
});
RDD 计数约为 40K,但 spark 连接器仅使用一致的 457 条记录填充数据库。
sqlContext.read.json(rdd).select("count_metadata.tran_id").count
还打印 40k 条记录。
这是我的 table 声明:
cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);
每条消息的 tran_id 都是唯一的。
我错过了什么?为什么不是所有 40k 条记录都达到 table?
我的日志也没有显示任何异常。
The tran_id is unique for each message.
我撒谎了:
println(df.distinct.count);
打印....
457
是时候将其提交给我们的上游资源了。
我使用的版本号:com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
我有一个来自 kafka 流的 RDD:
kafkaStream.foreachRDD((rdd: RDD[String]) => {
if(rdd.count > 0) {
println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");
sqlContext.read.json(rdd)
.select("count_metadata.tran_id")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "tmp", "keyspace" -> "kspace"))
.mode(SaveMode.Append)
.save();
} else {
println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
}
});
RDD 计数约为 40K,但 spark 连接器仅使用一致的 457 条记录填充数据库。
sqlContext.read.json(rdd).select("count_metadata.tran_id").count
还打印 40k 条记录。
这是我的 table 声明:
cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);
每条消息的 tran_id 都是唯一的。
我错过了什么?为什么不是所有 40k 条记录都达到 table?
我的日志也没有显示任何异常。
The tran_id is unique for each message.
我撒谎了:
println(df.distinct.count);
打印....
457
是时候将其提交给我们的上游资源了。