为什么写入新 Cassandra 之前和之后 Spark DataFrames 中的元素数量不同 table?

Why is there different number of elements in Spark DataFrames before and after writing to a new Cassandra table?

在我的代码中,我将现有 Cassandra table 中的数据读取到 Spark DataFrame 中,并将其转换为使用原始数据的反向映射构建一组新的 tables(最终目标是为来自 REST API).

的搜索查询提供服务

最近加了一些追踪,发现了一个我无法解释的事情。 下面是一段 Scala 代码来说明这个问题。

// df: org.apache.spark.sql.DataFrame
//
// control point 1: before writing the data to Cassandra
val inputCount = df.count
// write data to new C* table
df.createCassandraTable(keyspaceName, tableName, <otherArgs>)
df.write.mode("append").cassandraFormat(tableName, keyspaceName).save()

// read data back
val readbackDf = sqlContext.read.cassandraFormat(tableName, keyspaceName).load().cache
// control point 2: data written to C* table
val outputCount = readbackDf.count

// Produces different numbers
println(s"Input count = ${inputCount}; output count = ${outputCount}")

如果我在将数据写入新创建的 table 之前计算数据帧的 .count,它与我通过回读得到的数据帧的 .count 不同新 table.

因此,我有 2 个问题:

  1. 为什么我观察到 inputCountoutputCount 的值不同?
  2. 如果我在上面的代码中使用错误的方法来计算 outputCount,那么正确的方法是什么?

问题确实与Cassandra一致性设置有关。 非常感谢Anurag指出的人

事实证明,在我的测试环境中,我对读取和写入策略都使用了默认值,即 LOCAL_ONE。所以这很容易解释分歧。

我最终将它们都设置为 LOCAL_QUORUM:

spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_QUORUM

话虽如此,我想指出的是,我也尝试将只读设置为 LOCAL_QUORUM

spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_ONE

几乎消除了分歧。

然而,我仍然能够观察到这些设置 有时 (3-4 次运行中有一个)与我的一些 ETL 作业的小差异。

虽然我没有看到 reads/writes 一致性设置为 LOCAL_QUORUM 的显着性能下降,所以这个问题不再阻止我,但我仍然很好奇为什么设置只读LOCAL_QUORUM 并不能完全解决问题。

有人可以建议 "for-dummies" 对此的解释吗?