使用 TTL 将数据从一个 Cassandra table 复制到另一个
Copying data from one Cassandra table to another with TTL
我们正在更改 table 之一的分区键,方法是从分区键中删除一列。此 table 中的每条记录也有 TTL。现在我们想用 TTL 保存 table 中的数据。
我们该怎么做?
我们可以创建具有所需架构的新 table,然后将数据从旧 table 复制到新 table。但是,我们在这个过程中松开了 TTL。
有关更多信息 - 此 Cassandra table 由 Apache Storm 应用程序填充,该应用程序从 Kafka 读取事件。我们可以重新水化 Kafka 消息,但 Kafka 有一些我们不想处理的不需要的消息。
注意 - TTL 是根据日期列值决定的,该值永远不会改变。由于此 TTL 在所有列上始终相同。
在进行具体实施之前,重要的是要了解 TTL 可能存在于单个单元格以及行中的所有单元格上。当您执行 INSERT 或 UPDATE 操作时,您只能为查询中指定的所有列应用一个 TTL 值,因此如果您有 2 个具有不同 TTL 的列,那么您将需要执行 2 个查询 - 对于每个列,具有不同的 TTL。
关于工具 - 这里有 2 个或多或少的现成选项:
- 使用DSBulk. This approach is described in details in the example 30.1 of this blog post。基本上,您需要使用将为它们提取列值和 TTL 的查询将数据卸载到磁盘,然后通过为具有单独 TTL 的每个列生成批处理来加载数据。来自示例:
dsbulk unload -h localhost -query \
"SELECT id, petal_length, WRITETIME(petal_length) AS w_petal_length, TTL(petal_length) AS l_petal_length, .... FROM dsbulkblog.iris_with_id" \
-url /tmp/dsbulkblog/migrate
dsbulk load -h localhost -query \
"BEGIN BATCH INSERT INTO dsbulkblog.iris_with_id(id, petal_length) VALUES (:id, :petal_length) USING TIMESTAMP :w_petal_length AND TTL :l_petal_length; ... APPLY BATCH;" \
-url /tmp/dsbulkblog/migrate --batch.mode DISABLED
- 使用Spark Cassandra Connector - it supports reading & writing the data with TTL & WriteTime. But you'll need to develop the code that is doing it, and correctly handle things such as collections, static columns etc. (or wait for SPARKC-596实现)
我们正在更改 table 之一的分区键,方法是从分区键中删除一列。此 table 中的每条记录也有 TTL。现在我们想用 TTL 保存 table 中的数据。 我们该怎么做?
我们可以创建具有所需架构的新 table,然后将数据从旧 table 复制到新 table。但是,我们在这个过程中松开了 TTL。
有关更多信息 - 此 Cassandra table 由 Apache Storm 应用程序填充,该应用程序从 Kafka 读取事件。我们可以重新水化 Kafka 消息,但 Kafka 有一些我们不想处理的不需要的消息。
注意 - TTL 是根据日期列值决定的,该值永远不会改变。由于此 TTL 在所有列上始终相同。
在进行具体实施之前,重要的是要了解 TTL 可能存在于单个单元格以及行中的所有单元格上。当您执行 INSERT 或 UPDATE 操作时,您只能为查询中指定的所有列应用一个 TTL 值,因此如果您有 2 个具有不同 TTL 的列,那么您将需要执行 2 个查询 - 对于每个列,具有不同的 TTL。
关于工具 - 这里有 2 个或多或少的现成选项:
- 使用DSBulk. This approach is described in details in the example 30.1 of this blog post。基本上,您需要使用将为它们提取列值和 TTL 的查询将数据卸载到磁盘,然后通过为具有单独 TTL 的每个列生成批处理来加载数据。来自示例:
dsbulk unload -h localhost -query \
"SELECT id, petal_length, WRITETIME(petal_length) AS w_petal_length, TTL(petal_length) AS l_petal_length, .... FROM dsbulkblog.iris_with_id" \
-url /tmp/dsbulkblog/migrate
dsbulk load -h localhost -query \
"BEGIN BATCH INSERT INTO dsbulkblog.iris_with_id(id, petal_length) VALUES (:id, :petal_length) USING TIMESTAMP :w_petal_length AND TTL :l_petal_length; ... APPLY BATCH;" \
-url /tmp/dsbulkblog/migrate --batch.mode DISABLED
- 使用Spark Cassandra Connector - it supports reading & writing the data with TTL & WriteTime. But you'll need to develop the code that is doing it, and correctly handle things such as collections, static columns etc. (or wait for SPARKC-596实现)