为什么写入新 Cassandra 之前和之后 Spark DataFrames 中的元素数量不同 table?
Why is there different number of elements in Spark DataFrames before and after writing to a new Cassandra table?
在我的代码中,我将现有 Cassandra table 中的数据读取到 Spark DataFrame 中,并将其转换为使用原始数据的反向映射构建一组新的 tables(最终目标是为来自 REST API).
的搜索查询提供服务
最近加了一些追踪,发现了一个我无法解释的事情。
下面是一段 Scala 代码来说明这个问题。
// df: org.apache.spark.sql.DataFrame
//
// control point 1: before writing the data to Cassandra
val inputCount = df.count
// write data to new C* table
df.createCassandraTable(keyspaceName, tableName, <otherArgs>)
df.write.mode("append").cassandraFormat(tableName, keyspaceName).save()
// read data back
val readbackDf = sqlContext.read.cassandraFormat(tableName, keyspaceName).load().cache
// control point 2: data written to C* table
val outputCount = readbackDf.count
// Produces different numbers
println(s"Input count = ${inputCount}; output count = ${outputCount}")
如果我在将数据写入新创建的 table 之前计算数据帧的 .count
,它与我通过回读得到的数据帧的 .count
不同新 table.
因此,我有 2 个问题:
- 为什么我观察到
inputCount
和 outputCount
的值不同?
- 如果我在上面的代码中使用错误的方法来计算
outputCount
,那么正确的方法是什么?
问题确实与Cassandra一致性设置有关。
非常感谢Anurag指出的人
事实证明,在我的测试环境中,我对读取和写入策略都使用了默认值,即 LOCAL_ONE
。所以这很容易解释分歧。
我最终将它们都设置为 LOCAL_QUORUM
:
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_QUORUM
话虽如此,我想指出的是,我也尝试将只读设置为 LOCAL_QUORUM
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_ONE
几乎消除了分歧。
然而,我仍然能够观察到这些设置 有时 (3-4 次运行中有一个)与我的一些 ETL 作业的小差异。
虽然我没有看到 reads/writes 一致性设置为 LOCAL_QUORUM
的显着性能下降,所以这个问题不再阻止我,但我仍然很好奇为什么设置只读LOCAL_QUORUM
并不能完全解决问题。
有人可以建议 "for-dummies" 对此的解释吗?
在我的代码中,我将现有 Cassandra table 中的数据读取到 Spark DataFrame 中,并将其转换为使用原始数据的反向映射构建一组新的 tables(最终目标是为来自 REST API).
的搜索查询提供服务最近加了一些追踪,发现了一个我无法解释的事情。 下面是一段 Scala 代码来说明这个问题。
// df: org.apache.spark.sql.DataFrame
//
// control point 1: before writing the data to Cassandra
val inputCount = df.count
// write data to new C* table
df.createCassandraTable(keyspaceName, tableName, <otherArgs>)
df.write.mode("append").cassandraFormat(tableName, keyspaceName).save()
// read data back
val readbackDf = sqlContext.read.cassandraFormat(tableName, keyspaceName).load().cache
// control point 2: data written to C* table
val outputCount = readbackDf.count
// Produces different numbers
println(s"Input count = ${inputCount}; output count = ${outputCount}")
如果我在将数据写入新创建的 table 之前计算数据帧的 .count
,它与我通过回读得到的数据帧的 .count
不同新 table.
因此,我有 2 个问题:
- 为什么我观察到
inputCount
和outputCount
的值不同? - 如果我在上面的代码中使用错误的方法来计算
outputCount
,那么正确的方法是什么?
问题确实与Cassandra一致性设置有关。 非常感谢Anurag指出的人
事实证明,在我的测试环境中,我对读取和写入策略都使用了默认值,即 LOCAL_ONE
。所以这很容易解释分歧。
我最终将它们都设置为 LOCAL_QUORUM
:
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_QUORUM
话虽如此,我想指出的是,我也尝试将只读设置为 LOCAL_QUORUM
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_ONE
几乎消除了分歧。
然而,我仍然能够观察到这些设置 有时 (3-4 次运行中有一个)与我的一些 ETL 作业的小差异。
虽然我没有看到 reads/writes 一致性设置为 LOCAL_QUORUM
的显着性能下降,所以这个问题不再阻止我,但我仍然很好奇为什么设置只读LOCAL_QUORUM
并不能完全解决问题。
有人可以建议 "for-dummies" 对此的解释吗?