更改源的 Spark Dataframe.cache() 行为
Spark Dataframe.cache() behavior for changing source
我的用例:
- 从 cassandra 创建数据框 table。
- 通过对列进行过滤并修改该列的值来创建输出数据框。
- 将输出数据帧写入带有TTL设置的cassandra,因此所有修改的记录在短时间(2s)后被删除
- Return 输出数据帧到调用者,一段时间后将其写入文件系统。我只能 return 一个数据帧给调用者,我没有进一步的控制。另外,我不能增加TTL。
到执行第4步时,输出数据帧为空。这是因为,spark 重新评估操作上的数据框,并且由于沿袭,cassandra 查询再次完成,现在不产生任何记录。
为了避免这种情况,我在第 2 步之后添加了一个步骤:
2a) outputDataframe.cache()
这确保在第 5 步期间,cassandra 不会被查询,并且我的文件中也会获得所需的输出记录。我对这种方法有以下疑问:
- 是否有可能,如果 spark 找不到缓存数据(缓存查找失败),它会向上沿袭和 运行 cassandra 查询?如果是,在所有情况下如何避免这种情况?
- 我看到了另一种缓存方式:
df.rdd.cache()
。这与在数据帧上调用 cache()
有什么不同吗?
作为参考,我当前的代码如下所示:
//1
val dfOrig = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
.load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
.mode("append")
.save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv")
Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?
是的,这是可能的。缓存数据可以被缓存清理器移除(主要是在MEMORY_ONLY
模式下),当相应的节点退役(崩溃,抢占,动态分配释放)时可以丢失。此外,其他选项(如推测执行)可能会影响缓存行为。
最后,数据可能没有完全缓存。
If yes, what is the way to avoid that in all cases?
如果您需要强大的一致性保证,请不要使用 cache
/ persist
- 它在设计时并未考虑到此类用例。而是将数据导出到持久、可靠的存储(如 HDFS)并从那里读取。
您还可以将 checkpoint
与 HDFS checkpointDir
一起使用。
您可能会想使用更可靠的缓存模式,例如 MEMORY_AND_DISK_2
- 这可能会降低重新计算数据的可能性,但代价是
df.rdd.cache(). Is this any different than calling cache() on the dataframe?
它是不同的(主要区别是序列化策略),但当涉及到此问题范围内感兴趣的属性时则不同。
重要:
请注意,缓存行为可能不是您代码中的最大问题。读取和附加到单个 table 可能会导致复杂管道中出现各种不需要或未定义的行为,除非采取额外的步骤来确保 reader 不会选择新写入的记录。
我的用例:
- 从 cassandra 创建数据框 table。
- 通过对列进行过滤并修改该列的值来创建输出数据框。
- 将输出数据帧写入带有TTL设置的cassandra,因此所有修改的记录在短时间(2s)后被删除
- Return 输出数据帧到调用者,一段时间后将其写入文件系统。我只能 return 一个数据帧给调用者,我没有进一步的控制。另外,我不能增加TTL。
到执行第4步时,输出数据帧为空。这是因为,spark 重新评估操作上的数据框,并且由于沿袭,cassandra 查询再次完成,现在不产生任何记录。
为了避免这种情况,我在第 2 步之后添加了一个步骤:
2a) outputDataframe.cache()
这确保在第 5 步期间,cassandra 不会被查询,并且我的文件中也会获得所需的输出记录。我对这种方法有以下疑问:
- 是否有可能,如果 spark 找不到缓存数据(缓存查找失败),它会向上沿袭和 运行 cassandra 查询?如果是,在所有情况下如何避免这种情况?
- 我看到了另一种缓存方式:
df.rdd.cache()
。这与在数据帧上调用cache()
有什么不同吗?
作为参考,我当前的代码如下所示:
//1
val dfOrig = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
.load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
.mode("append")
.save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv")
Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?
是的,这是可能的。缓存数据可以被缓存清理器移除(主要是在MEMORY_ONLY
模式下),当相应的节点退役(崩溃,抢占,动态分配释放)时可以丢失。此外,其他选项(如推测执行)可能会影响缓存行为。
最后,数据可能没有完全缓存。
If yes, what is the way to avoid that in all cases?
如果您需要强大的一致性保证,请不要使用 cache
/ persist
- 它在设计时并未考虑到此类用例。而是将数据导出到持久、可靠的存储(如 HDFS)并从那里读取。
您还可以将 checkpoint
与 HDFS checkpointDir
一起使用。
您可能会想使用更可靠的缓存模式,例如 MEMORY_AND_DISK_2
- 这可能会降低重新计算数据的可能性,但代价是
df.rdd.cache(). Is this any different than calling cache() on the dataframe?
它是不同的(主要区别是序列化策略),但当涉及到此问题范围内感兴趣的属性时则不同。
重要:
请注意,缓存行为可能不是您代码中的最大问题。读取和附加到单个 table 可能会导致复杂管道中出现各种不需要或未定义的行为,除非采取额外的步骤来确保 reader 不会选择新写入的记录。