如何使用 golang 查询将数据流式传输到 Cassandra

How to stream data with golang query to Cassandra

我有以下代码:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users LIMIT 9999999999;`).Iter()
c :=iter.Columns()
scanArgs := make([]interface{}, len(c))

for i:=0; i < len(scanArgs); i++ {
    scanArgs[i] = makeType(c[i])
}

for iter.Scan(scanArgs...) { ... }

问题 是 table 中的行太多了。但我需要阅读所有这些,将数据迁移到另一个数据库。有没有办法 stream 来自 Cassandra 的数据?不幸的是,我们没有 table 的主键序列,我们使用 uuid 作为 PK。所以这意味着我们不能使用 2 个 for 循环的简单技术,一个递增计数器并以这种方式遍历所有行。

Gocql 有一些分页选项(假设你的 Cassandra 版本至少是版本 2)。

Gocql 的 Session 有一个方法 SetPageSize

Gocql的Query也有类似的方法,PageSize

这可能会帮助您分解查询。代码如下所示:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()

iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(5000).Iter()

// use the iter as usual to iterate over all results 
// this will send additional CQL queries when it needs to get new pages

对于纯 Cassandra 方法来执行此操作,您 可以 运行 对令牌范围进行范围查询,因为它们按节点分解。

首先,找到令牌范围。:

$ nodetool ring

Datacenter: dc1
==========
Address   Rack       Status State   Load         Owns                Token
                                                                      8961648479018332584
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -9023369133424793632
10.1.4.1  rack2      Up     Normal  1.56 GB      100.00%             -7946127339777435347
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -7847456805881540087
...

等...(这可能很大,取决于节点数量和每个节点上的代币

然后调整查询以在分区键上使用 token() 函数。由于我不知道你的 PRIMARY KEY 定义是什么,我将猜测并使用 users_id 作为分区键:

SELECT * FROM cmuser.users
WHERE token(users_id) > 8961648479018332584
  AND token(users_id) <= -9023369133424793632;

完成后,移动到下一个标记范围:

SELECT * FROM cmuser.users
WHERE token(users_id) > -9023369133424793632
  AND token(users_id) <= -7946127339777435347;

像这样分解您的查询将有助于确保它一次只从一个节点读取。这应该允许查询从集群(和磁盘)顺序读取数据,而不用担心超时。