如何有效地批量处理来自 Cassandra 的 select 数据?
How to batch select data from Cassandra effectively?
我知道Cassandra不支持批量查询,也不推荐使用IN
,因为它会降低性能。但是我必须通过id来获取数据,例如:
select * from visit where id in ([visit_id array])
desc table:
CREATE TABLE visit (
enterprise_id int,
id text,
........
PRIMARY KEY (enterprise_id, id)
数组可能有数千个元素。有什么办法可以让它有效吗?
大 在查询中创建 GC 暂停和堆压力,导致整体性能下降。当您执行大量查询时,这意味着您正在等待这个单个协调器节点给您响应,它将所有这些查询及其响应保存在堆中,如果其中一个查询失败,或者协调器失败,您有重试整个事情。
方法 1:
尝试将您的 in 查询转换为范围查询 (>=, <=)
SELECT * visit WHERE enterprise_id = ? and id >= ? and id <= ?
方法二:
使用 executeAsync,Java 示例
PreparedStatement statement = session.prepare("SELECT * FROM visit where enterprise_id = ? and id = ?");
List<ResultSetFuture> futures = new ArrayList<>();
for (int i = 1; i < 4; i++) {
ResultSetFuture resultSetFuture = session.executeAsync(statement.bind(i, i));
futures.add(resultSetFuture);
}
List<String> results = new ArrayList<>();
for (ResultSetFuture future : futures){
ResultSet rows = future.getUninterruptibly();
Row row = rows.one();
results.add(row.getString("name"));
}
return results;
方法 3:
如果可能,则创建另一个 table,而不是在查询中,当您将在查询中执行的数据即将插入或更新时,也将数据插入到新的 table,然后您可以只从新的 table 查询而不用查询
来源:
http://www.datastax.com/dev/blog/a-deep-look-to-the-cql-where-clause
https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/
我提出此类查询的首选方式是 展开 IN
部分。这仅仅意味着您需要并行发出多个查询,因为 token-o-matic(又名令牌感知)驱动程序会将每个查询视为一个独立的查询,然后将将这些分布在不同的节点之间,使每个单个节点成为负责将要达到的每个查询的协调器。
您应该 运行 最多 X 个查询并等待至少其中一个完成(我使用 Java):
final int X = 1000;
ArrayList<ResultSetFuture> futures = new ArrayList<>();
ArrayList<ResultSet> results = new ArrayList<>();
for (int i = 0; i < allTheRowsINeedToFetch; i++) {
futures.add(session.executeAsync(myBeautifulPreparedStatement.bind(xxx,yyy,zzz)));
while (futures.size() >= X || (futures.size() > 0 && futures.get(0).isDone())) {
ResultSetFuture rsf = futures.remove(0);
results.add(rsf.getUninterruptibly());
}
}
while (futures.size() > 0) {
ResultSetFuture rsf = futures.remove(0);
results.add(rsf.getUninterruptibly());
}
// Now use the results
这被称为 背压,它用于将压力从集群转移到客户端.
此方法的优点在于您可以真正并行 (X = allTheRowsINeedToFetch),也可以真正串行 (X = 1),介于两者之间的一切仅取决于 您的 集群硬件。 X 的低值意味着你没有使用你的集群功能 足够,高值意味着你会打电话找麻烦,因为你会开始查看超时。所以,你真的需要调整它。
我知道Cassandra不支持批量查询,也不推荐使用IN
,因为它会降低性能。但是我必须通过id来获取数据,例如:
select * from visit where id in ([visit_id array])
desc table:
CREATE TABLE visit (
enterprise_id int,
id text,
........
PRIMARY KEY (enterprise_id, id)
数组可能有数千个元素。有什么办法可以让它有效吗?
大 在查询中创建 GC 暂停和堆压力,导致整体性能下降。当您执行大量查询时,这意味着您正在等待这个单个协调器节点给您响应,它将所有这些查询及其响应保存在堆中,如果其中一个查询失败,或者协调器失败,您有重试整个事情。
方法 1:
尝试将您的 in 查询转换为范围查询 (>=, <=)
SELECT * visit WHERE enterprise_id = ? and id >= ? and id <= ?
方法二:
使用 executeAsync,Java 示例
PreparedStatement statement = session.prepare("SELECT * FROM visit where enterprise_id = ? and id = ?");
List<ResultSetFuture> futures = new ArrayList<>();
for (int i = 1; i < 4; i++) {
ResultSetFuture resultSetFuture = session.executeAsync(statement.bind(i, i));
futures.add(resultSetFuture);
}
List<String> results = new ArrayList<>();
for (ResultSetFuture future : futures){
ResultSet rows = future.getUninterruptibly();
Row row = rows.one();
results.add(row.getString("name"));
}
return results;
方法 3:
如果可能,则创建另一个 table,而不是在查询中,当您将在查询中执行的数据即将插入或更新时,也将数据插入到新的 table,然后您可以只从新的 table 查询而不用查询
来源:
http://www.datastax.com/dev/blog/a-deep-look-to-the-cql-where-clause
https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/
我提出此类查询的首选方式是 展开 IN
部分。这仅仅意味着您需要并行发出多个查询,因为 token-o-matic(又名令牌感知)驱动程序会将每个查询视为一个独立的查询,然后将将这些分布在不同的节点之间,使每个单个节点成为负责将要达到的每个查询的协调器。
您应该 运行 最多 X 个查询并等待至少其中一个完成(我使用 Java):
final int X = 1000;
ArrayList<ResultSetFuture> futures = new ArrayList<>();
ArrayList<ResultSet> results = new ArrayList<>();
for (int i = 0; i < allTheRowsINeedToFetch; i++) {
futures.add(session.executeAsync(myBeautifulPreparedStatement.bind(xxx,yyy,zzz)));
while (futures.size() >= X || (futures.size() > 0 && futures.get(0).isDone())) {
ResultSetFuture rsf = futures.remove(0);
results.add(rsf.getUninterruptibly());
}
}
while (futures.size() > 0) {
ResultSetFuture rsf = futures.remove(0);
results.add(rsf.getUninterruptibly());
}
// Now use the results
这被称为 背压,它用于将压力从集群转移到客户端.
此方法的优点在于您可以真正并行 (X = allTheRowsINeedToFetch),也可以真正串行 (X = 1),介于两者之间的一切仅取决于 您的 集群硬件。 X 的低值意味着你没有使用你的集群功能 足够,高值意味着你会打电话找麻烦,因为你会开始查看超时。所以,你真的需要调整它。