Spark 使用不同的 TTL 写入 Cassandra
Spark writing to Cassandra with varying TTL
在 Java Spark 中,我有一个包含 'bucket_timestamp' 列的数据框,它表示该行所属的存储桶的时间。
我想将数据帧写入 Cassandra 数据库。必须使用 TTL 将数据写入 DB。 TTL 应取决于存储桶时间戳 - 其中每行的 TTL 应计算为 ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp)
,其中 CONST_TTL
是我配置的常量 TTL。
目前我正在使用常量 TTL 使用 spark 写入 Cassandra,代码如下:
df.write().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "key_space_name");
put("table, "table_name");
put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
}
}).mode(SaveMode.Overwrite).save();
我想到的一种可能的方法是 - 对于每个可能的 bucket_timestamp - 根据时间戳过滤数据,计算 TTL 并将过滤后的数据写入 Cassandra。但这似乎非常低效,而不是火花方式。 Java Spark 中有没有办法提供一个 spark 列作为 TTL 选项,以便每一行的 TTL 都不同?
解决方案应该使用 Java 和数据集<行>:我遇到了一些在 Scala 中使用 RDD 执行此操作的解决方案,但没有找到使用 Java 和数据框的解决方案。
谢谢!
从 Spark-Cassandra 连接器选项 (https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java) 您可以将 TTL 设置为:
- 恒定值 (
withConstantTTL
)
- 自动解析值(
withAutoTTL
)
- 基于列的值 (
withPerRowTTL
)
在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则将 TTL 计算为起始 Dataset
的新列。
对于 DataFrame API 不支持此类功能,但是...有 JIRA 用于它 - https://datastax-oss.atlassian.net/browse/SPARKC-416,您可以观看它以在实现时收到通知...
因此,您唯一的选择就是按照@bartosz25 的回答中所述使用 RDD API...
在 Java Spark 中,我有一个包含 'bucket_timestamp' 列的数据框,它表示该行所属的存储桶的时间。
我想将数据帧写入 Cassandra 数据库。必须使用 TTL 将数据写入 DB。 TTL 应取决于存储桶时间戳 - 其中每行的 TTL 应计算为 ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp)
,其中 CONST_TTL
是我配置的常量 TTL。
目前我正在使用常量 TTL 使用 spark 写入 Cassandra,代码如下:
df.write().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "key_space_name");
put("table, "table_name");
put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
}
}).mode(SaveMode.Overwrite).save();
我想到的一种可能的方法是 - 对于每个可能的 bucket_timestamp - 根据时间戳过滤数据,计算 TTL 并将过滤后的数据写入 Cassandra。但这似乎非常低效,而不是火花方式。 Java Spark 中有没有办法提供一个 spark 列作为 TTL 选项,以便每一行的 TTL 都不同?
解决方案应该使用 Java 和数据集<行>:我遇到了一些在 Scala 中使用 RDD 执行此操作的解决方案,但没有找到使用 Java 和数据框的解决方案。
谢谢!
从 Spark-Cassandra 连接器选项 (https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java) 您可以将 TTL 设置为:
- 恒定值 (
withConstantTTL
) - 自动解析值(
withAutoTTL
) - 基于列的值 (
withPerRowTTL
)
在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则将 TTL 计算为起始 Dataset
的新列。
对于 DataFrame API 不支持此类功能,但是...有 JIRA 用于它 - https://datastax-oss.atlassian.net/browse/SPARKC-416,您可以观看它以在实现时收到通知...
因此,您唯一的选择就是按照@bartosz25 的回答中所述使用 RDD API...