Cassandra:列出 10 个最近修改的记录

Cassandra: List 10 most recently modified records

我在尝试对我的数据进行建模时遇到问题,以便我可以有效地查询 Cassandra 以获取最近修改的最后 10 个(实际上是任何数字)记录。每条记录都有一个 last_modified_date 列,由应用程序在 inserting/updating 记录时设置。

我已经从这个示例代码中排除了数据列。

主要数据table(每条记录仅包含一行):

CREATE TABLE record (
    record_id int,
    last_modified_by text,
    last_modified_date timestamp,
    PRIMARY KEY (record_id)
);

解决方案 1(失败)

我尝试创建一个单独的 table,它使用了集群键顺序。

Table(每条记录一行;仅插入最后修改日期):

CREATE TABLE record_by_last_modified_index (
    record_id int,
    last_modified_by text,
    last_modified_date timestamp,
    PRIMARY KEY (record_id, last_modified_date)
) WITH CLUSTERING ORDER BY (last_modified_date DESC);

查询:

SELECT * FROM record_by_last_modified_index LIMIT 10

此解决方案不起作用,因为聚类顺序仅适用于具有相同分区键的记录的排序。由于每一行都有不同的分区键 (record_id),查询结果不包括预期的记录。

解决方案 2(低效)

我尝试过的另一个解决方案是简单地查询 Cassandra 以获取所有 record_id 和 last_modified_date 值,对它们进行排序并在我的应用程序中选择前 10 条记录。这显然效率低下并且无法很好地扩展。

解决方案 3

我考虑过的最后一个解决方案是对所有记录使用相同的分区键并使用聚类顺序来确保记录正确排序。该解决方案的问题是数据不会在节点之间正确分区,因为所有记录都具有相同的分区键。对我来说,这似乎是不可能的。

我认为您尝试做的更多是关系数据库模型,在某种程度上是 Cassandra 中的反模式。

Cassandra 仅根据聚类列对事物进行排序,但排序顺序预计不会更改。这是因为当 memtables 作为 SSTables (Sorted String Tables) 写入磁盘时,SSTables 是 immutable 并且无法有效地重新排序。这就是不允许您更新聚簇列值的原因。

如果要对聚集的行重新排序,我知道的唯一方法是删除旧行并批量插入新行。为了使效率更低,您可能需要先阅读以找出 last_modified_date 对 record_id 的意义,以便您可以删除它。

所以我会寻找一种不同的方法,例如只将更新写为新的聚集行并将旧的保留在那里(可能会随着时间的推移使用 TTL 清理它们)。因此,当您执行 LIMIT 查询时,您的最新更新将始终位于最前面。

在分区方面,您需要将数据分成几类,以便将数据分布到您的节点上。这意味着您不会对 table 进行全局排序,而只能在类别内进行排序,这是由于分布式模型造成的。如果您真的需要全局排序,那么也许可以看看将 Cassandra 与 Spark 配对之类的东西。排序非常耗费时间和资源,所以如果你真的需要它,请仔细考虑。

更新:

再考虑一下,您应该可以在 Cassandra 3.0 中使用物化视图来做到这一点。该视图会为您处理混乱的删除和插入,以重新排序聚集的行。所以这就是它在 3.0 alpha 版本中的样子:

首先创建基地table:

CREATE TABLE record_ids (
    record_type int,
    last_modified_date timestamp,
    record_id int,
    PRIMARY KEY(record_type, record_id));

然后创建 table 的视图,使用 last_modified_date 作为集群列:

CREATE MATERIALIZED VIEW last_modified AS
    SELECT record_type FROM record_ids
    WHERE record_type IS NOT NULL AND last_modified_date IS NOT NULL AND record_id IS NOT NULL
    PRIMARY KEY (record_type, last_modified_date, record_id)
    WITH CLUSTERING ORDER BY (last_modified_date DESC);

现在插入一些记录:

insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 100);
insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 200);
insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 300);

SELECT * FROM record_ids;

 record_type | record_id | last_modified_date
-------------+-----------+--------------------------
           1 |       100 | 2015-08-14 19:41:10+0000
           1 |       200 | 2015-08-14 19:41:25+0000
           1 |       300 | 2015-08-14 19:41:41+0000

SELECT * FROM last_modified;

 record_type | last_modified_date       | record_id
-------------+--------------------------+-----------
           1 | 2015-08-14 19:41:41+0000 |       300
           1 | 2015-08-14 19:41:25+0000 |       200
           1 | 2015-08-14 19:41:10+0000 |       100

现在我们更新基础 table 中的一条记录,应该会看到它移到视图列表的顶部:

UPDATE record_ids SET last_modified_date = dateof(now()) 
WHERE record_type=1 AND record_id=200;

因此在基础 table 中,我们看到 record_id=200 的时间戳已更新:

SELECT * FROM record_ids;

 record_type | record_id | last_modified_date
-------------+-----------+--------------------------
           1 |       100 | 2015-08-14 19:41:10+0000
           1 |       200 | 2015-08-14 19:43:13+0000
           1 |       300 | 2015-08-14 19:41:41+0000

在视图中,我们看到:

 SELECT * FROM last_modified;

 record_type | last_modified_date       | record_id
-------------+--------------------------+-----------
           1 | 2015-08-14 19:43:13+0000 |       200
           1 | 2015-08-14 19:41:41+0000 |       300
           1 | 2015-08-14 19:41:10+0000 |       100

所以你看到 record_id=200 在视图中向上移动,如果你对那个 table 做一个限制 N,你会得到 N 个最近修改的行。

CQL 查询按字段排序的整个 table/view 的唯一方法是使分区键保持不变。恰好一台机器(乘以复制因子)将容纳整个 table。例如。使用始终为零的 partition INT 分区键和集群键作为需要排序的字段。即使您的集群中有更多节点,您也应该观察到 read/write/capacity 性能类似于单节点数据库,在排序字段上有一个索引。这并没有完全违背 Cassandra 的目的,因为它可以帮助将来扩展。

如果性能不够,那么您可以决定通过增加分区种类来扩展。例如。当使用 4 个节点时,从 0、1、2、3 中随机选择插入将达到四倍 read/write/capacity 性能。然后要找到“10 个最新”项目,您必须手动查询所有 4 个分区并对结果进行合并排序。

理论上,Cassandra 可以为 INSERT 提供动态节点计数最大模分区键和 SELECT(使用 ALLOW FILTERING)的合并排序。

Cassandra 的设计目标不允许全局排序

要允许写入、读取和存储容量随节点数线性扩展,Cassandra 需要:

  • 每个插入都落在一个节点上。
  • 每个 select 降落在一个节点上。
  • 客户端在所有节点之间以类似方式分配工作负载。

如果我理解正确,结果是完整的table单字段排序查询将始终需要从整个集群读取并合并排序。

请注意物化视图等同于 tables,它们没有任何神奇的 属性 让它们更擅长全局排序。请参阅 http://www.datastax.com/dev/blog/we-shall-have-order,其中 Aaron Ploetz 同意 cassandra 和 cql 不能在没有分区和缩放的情况下对一个字段进行排序。

示例解决方案

CREATE KEYSPACE IF NOT EXISTS
    tmpsort
WITH REPLICATION =
    {'class':'SimpleStrategy', 'replication_factor' : 1};

USE tmpsort;

CREATE TABLE record_ids (
    partition int,
    last_modified_date timestamp,
    record_id int,
    PRIMARY KEY((partition), last_modified_date, record_id))
    WITH CLUSTERING ORDER BY (last_modified_date DESC);

INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 100);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 101);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 102);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 103);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 104);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 105);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 106);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 107);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 108);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 109);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 110);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 111);

SELECT * FROM record_ids;

-- Note the results are only sorted in their partition
-- To try again:
-- DROP KEYSPACE tmpsort;

请注意,如果没有 WHERE 子句,您将获得令牌(分区键)顺序的结果。参见 https://dba.stackexchange.com/questions/157537/querying-cassandra-without-a-partition-key

其他数据库分布模型

如果我理解正确的话 - CockroachDB 会在任何给定时间将单调递增数据类似地限制在一个节点的性能上 read/write 但存储容量会线性扩展。与 Cassandra 相比,其他范围查询如 "oldest 10" 或 "between date X and date Y" 会将负载分布在更多节点上。这是因为 CockroachDB 的数据库是一个巨大的排序键值存储,每当排序的数据范围达到一定大小时,它就会重新分配。

我认为公认的解决方案还有另一个问题。 如果您有多个副本,则不能保证插入按顺序结束。

来自 datastax 文档:

now() - In the coordinator node, generates a new unique timeuuid in milliseconds when the statement is executed. The timestamp portion of the timeuuid conforms to the UTC (Universal Time) standard. This method is useful for inserting values. The value returned by now() is guaranteed to be unique.

当您有多个副本时,您也有多个协调器节点,因为可以选择任何节点作为协调器节点。这意味着由于节点上时间的任何微小变化,您的插入没有顺序。 因此,实际上稍后发生在您的参考框架中的一个插入可能会排在前一个插入的记录之前,因为 now() 只是在协调器节点上生成一个日期,该日期稍晚一些。

您正试图对您的数据获得一些一致的(或对事实的单一参考)观点。不幸的是,在分布式环境中,没有对真相的单一引用。