如何使用 golang 查询将数据流式传输到 Cassandra
How to stream data with golang query to Cassandra
我有以下代码:
cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users LIMIT 9999999999;`).Iter()
c :=iter.Columns()
scanArgs := make([]interface{}, len(c))
for i:=0; i < len(scanArgs); i++ {
scanArgs[i] = makeType(c[i])
}
for iter.Scan(scanArgs...) { ... }
问题 是 table 中的行太多了。但我需要阅读所有这些,将数据迁移到另一个数据库。有没有办法 stream 来自 Cassandra 的数据?不幸的是,我们没有 table 的主键序列,我们使用 uuid 作为 PK。所以这意味着我们不能使用 2 个 for 循环的简单技术,一个递增计数器并以这种方式遍历所有行。
Gocql 有一些分页选项(假设你的 Cassandra 版本至少是版本 2)。
Gocql 的 Session
有一个方法 SetPageSize
Gocql的Query
也有类似的方法,PageSize
这可能会帮助您分解查询。代码如下所示:
cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(5000).Iter()
// use the iter as usual to iterate over all results
// this will send additional CQL queries when it needs to get new pages
对于纯 Cassandra 方法来执行此操作,您 可以 运行 对令牌范围进行范围查询,因为它们按节点分解。
首先,找到令牌范围。:
$ nodetool ring
Datacenter: dc1
==========
Address Rack Status State Load Owns Token
8961648479018332584
10.1.4.3 rack3 Up Normal 1.34 GB 100.00% -9023369133424793632
10.1.4.1 rack2 Up Normal 1.56 GB 100.00% -7946127339777435347
10.1.4.3 rack3 Up Normal 1.34 GB 100.00% -7847456805881540087
...
等...(这可能很大,取决于节点数量和每个节点上的代币)
然后调整查询以在分区键上使用 token()
函数。由于我不知道你的 PRIMARY KEY 定义是什么,我将猜测并使用 users_id
作为分区键:
SELECT * FROM cmuser.users
WHERE token(users_id) > 8961648479018332584
AND token(users_id) <= -9023369133424793632;
完成后,移动到下一个标记范围:
SELECT * FROM cmuser.users
WHERE token(users_id) > -9023369133424793632
AND token(users_id) <= -7946127339777435347;
像这样分解您的查询将有助于确保它一次只从一个节点读取。这应该允许查询从集群(和磁盘)顺序读取数据,而不用担心超时。
我有以下代码:
cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users LIMIT 9999999999;`).Iter()
c :=iter.Columns()
scanArgs := make([]interface{}, len(c))
for i:=0; i < len(scanArgs); i++ {
scanArgs[i] = makeType(c[i])
}
for iter.Scan(scanArgs...) { ... }
问题 是 table 中的行太多了。但我需要阅读所有这些,将数据迁移到另一个数据库。有没有办法 stream 来自 Cassandra 的数据?不幸的是,我们没有 table 的主键序列,我们使用 uuid 作为 PK。所以这意味着我们不能使用 2 个 for 循环的简单技术,一个递增计数器并以这种方式遍历所有行。
Gocql 有一些分页选项(假设你的 Cassandra 版本至少是版本 2)。
Gocql 的 Session
有一个方法 SetPageSize
Gocql的Query
也有类似的方法,PageSize
这可能会帮助您分解查询。代码如下所示:
cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(5000).Iter()
// use the iter as usual to iterate over all results
// this will send additional CQL queries when it needs to get new pages
对于纯 Cassandra 方法来执行此操作,您 可以 运行 对令牌范围进行范围查询,因为它们按节点分解。
首先,找到令牌范围。:
$ nodetool ring
Datacenter: dc1
==========
Address Rack Status State Load Owns Token
8961648479018332584
10.1.4.3 rack3 Up Normal 1.34 GB 100.00% -9023369133424793632
10.1.4.1 rack2 Up Normal 1.56 GB 100.00% -7946127339777435347
10.1.4.3 rack3 Up Normal 1.34 GB 100.00% -7847456805881540087
...
等...(这可能很大,取决于节点数量和每个节点上的代币)
然后调整查询以在分区键上使用 token()
函数。由于我不知道你的 PRIMARY KEY 定义是什么,我将猜测并使用 users_id
作为分区键:
SELECT * FROM cmuser.users
WHERE token(users_id) > 8961648479018332584
AND token(users_id) <= -9023369133424793632;
完成后,移动到下一个标记范围:
SELECT * FROM cmuser.users
WHERE token(users_id) > -9023369133424793632
AND token(users_id) <= -7946127339777435347;
像这样分解您的查询将有助于确保它一次只从一个节点读取。这应该允许查询从集群(和磁盘)顺序读取数据,而不用担心超时。