使用具有 Java 8 个并行流的 Datastax Cassandra 结果集 - 快速

Using a Datastax Cassandra ResultSet with Java 8 Parallel Streams - Quickly

我正在使用 Datastax 驱动程序从 Cassandra 获取大量行,我需要尽快处理它们。

我研究过使用 List::parallelStream().forEach(),起初看起来不错,因为 ResultSet 的行为很像 List,但遗憾的是我无法直接使用 parallelStream()ResultSet 上。为了让它工作,我首先必须使用 ResultSet::all() 这真的很慢 - 我假设它遍历每个元素。

ResultSet rs = this.getResultSet(); // Takes <1 second

// Convert the ResultSet to a list so as I can use parallelStream().
List<Row> rsList = rs.all(); // Takes 21 seconds

rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds

有没有更快的方法可以处理结果集的每一行?

To get this to work I first have to use ResultSet::all() which really is slow

ResultSet.all() 将使用服务器端 分页 获取 所有行 。您可以使用 statement.setFetchSize()

控制页面大小

Is there any faster way I can process each row of the result set?

这取决于您的查询,它是什么?如果你正在做一个完整的分区扫描,只有几台机器在做这项工作,但如果你从多个分区获取数据,你可以尝试用多个查询来并行化它们,每个分区一个

你可以试试这个:

ResultSet rs = this.getResultSet(); // Takes <1 second

StreamSupport.stream(
    Spliterators.spliteratorUnknownSize(
                rs.iterator(), Spliterator.ORDERED), false)
       .parallel().forEach(this::processRow);

省略对 rs.all()

的调用

希望,如果 ResultSet 允许立即开始迭代,您将能够更早地并行处理。

更新

检查 ResultSet 的来源后,这是我看到的:

方法 all() 创建一个新的 ArrayList 并填充它,在您的情况下需要 21 秒

List<Row> result = new ArrayList<Row>(rows.size());
for (Row row : this)
    result.add(row);

迭代器中实现的方法next()改为轮询行队列

public Row next() {
    return Row.fromData(metadata, rows.poll());
}

这意味着数据处理不需要等待 21 秒就可以开始处理第一行。

与作者描述的结果几乎相同。 我的解决方案是将 FetchSize 设置为更大的值。正如我所读,默认值为 5000。获取所有内容并遍历它对我来说花费了大约 25 秒。使用 .setFetchSize(50000) 迭代需要 0.8 秒。我什至不相信它仍然。用简单的 foreach 循环迭代

我的代码:

String sql = "...."
prepearedSql = session.prepare(sql);
Statement statement = prepearedSql.bind().setValues(...).setFetchSize(50000);
ResultSet result = session.execute(statement);
for (Row row : result)
    {...