如何有效地批量处理来自 Cassandra 的 select 数据?

How to batch select data from Cassandra effectively?

我知道Cassandra不支持批量查询,也不推荐使用IN,因为它会降低性能。但是我必须通过id来获取数据,例如:

select * from visit where id in ([visit_id array])

desc table:

CREATE TABLE visit (
    enterprise_id int,
    id text,
    ........
    PRIMARY KEY (enterprise_id, id)

数组可能有数千个元素。有什么办法可以让它有效吗?

大 在查询中创建 GC 暂停和堆压力,导致整体性能下降。当您执行大量查询时,这意味着您正在等待这个单个协调器节点给您响应,它将所有这些查询及其响应保存在堆中,如果其中一个查询失败,或者协调器失败,您有重试整个事情。

方法 1:

尝试将您的 in 查询转换为范围查询 (>=, <=)

SELECT * visit WHERE enterprise_id = ? and id >= ? and id <= ?

方法二:

使用 executeAsync,Java 示例

PreparedStatement statement = session.prepare("SELECT * FROM visit where enterprise_id = ? and id = ?");

List<ResultSetFuture> futures = new ArrayList<>();
for (int i = 1; i < 4; i++) {
    ResultSetFuture resultSetFuture = session.executeAsync(statement.bind(i, i));
    futures.add(resultSetFuture);
}

List<String> results = new ArrayList<>();
for (ResultSetFuture future : futures){
     ResultSet rows = future.getUninterruptibly();
     Row row = rows.one();
     results.add(row.getString("name"));
}
return results; 

方法 3:

如果可能,则创建另一个 table,而不是在查询中,当您将在查询中执行的数据即将插入或更新时,也将数据插入到新的 table,然后您可以只从新的 table 查询而不用查询

来源:
http://www.datastax.com/dev/blog/a-deep-look-to-the-cql-where-clause https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/

我提出此类查询的首选方式是 展开 IN 部分。这仅仅意味着您需要并行发出多个查询,因为 token-o-matic(又名令牌感知)驱动程序会将每个查询视为一个独立的查询,然后将将这些分布在不同的节点之间,使每个单个节点成为负责将要达到的每个查询的协调器。

您应该 运行 最多 X 个查询并等待至少其中一个完成(我使用 Java):

final int X = 1000;
ArrayList<ResultSetFuture> futures = new ArrayList<>();
ArrayList<ResultSet> results = new ArrayList<>();
for (int i = 0; i < allTheRowsINeedToFetch; i++) {
    futures.add(session.executeAsync(myBeautifulPreparedStatement.bind(xxx,yyy,zzz)));
    while (futures.size() >= X || (futures.size() > 0 && futures.get(0).isDone())) {
        ResultSetFuture rsf = futures.remove(0);
        results.add(rsf.getUninterruptibly());
    }
}

while (futures.size() > 0) {
    ResultSetFuture rsf = futures.remove(0);
    results.add(rsf.getUninterruptibly());
}

// Now use the results

这被称为 背压,它用于压力从集群转移到客户端.

此方法的优点在于您可以真正并行 (X = allTheRowsINeedToFetch),也可以真正串行 (X = 1),介于两者之间的一切仅取决于 您的 集群硬件。 X 的低值意味着你没有使用你的集群功能 足够,高值意味着你会打电话找麻烦,因为你会开始查看超时。所以,你真的需要调整它。