使用具有 Java 8 个并行流的 Datastax Cassandra 结果集 - 快速
Using a Datastax Cassandra ResultSet with Java 8 Parallel Streams - Quickly
我正在使用 Datastax 驱动程序从 Cassandra 获取大量行,我需要尽快处理它们。
我研究过使用 List::parallelStream().forEach()
,起初看起来不错,因为 ResultSet
的行为很像 List
,但遗憾的是我无法直接使用 parallelStream()
在 ResultSet
上。为了让它工作,我首先必须使用 ResultSet::all()
这真的很慢 - 我假设它遍历每个元素。
ResultSet rs = this.getResultSet(); // Takes <1 second
// Convert the ResultSet to a list so as I can use parallelStream().
List<Row> rsList = rs.all(); // Takes 21 seconds
rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds
有没有更快的方法可以处理结果集的每一行?
To get this to work I first have to use ResultSet::all() which really is slow
ResultSet.all()
将使用服务器端 分页 获取 所有行 。您可以使用 statement.setFetchSize()
控制页面大小
Is there any faster way I can process each row of the result set?
这取决于您的查询,它是什么?如果你正在做一个完整的分区扫描,只有几台机器在做这项工作,但如果你从多个分区获取数据,你可以尝试用多个查询来并行化它们,每个分区一个
你可以试试这个:
ResultSet rs = this.getResultSet(); // Takes <1 second
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
rs.iterator(), Spliterator.ORDERED), false)
.parallel().forEach(this::processRow);
省略对 rs.all()
的调用
希望,如果 ResultSet
允许立即开始迭代,您将能够更早地并行处理。
更新
检查 ResultSet
的来源后,这是我看到的:
方法 all()
创建一个新的 ArrayList
并填充它,在您的情况下需要 21 秒
List<Row> result = new ArrayList<Row>(rows.size());
for (Row row : this)
result.add(row);
迭代器中实现的方法next()
改为轮询行队列
public Row next() {
return Row.fromData(metadata, rows.poll());
}
这意味着数据处理不需要等待 21 秒就可以开始处理第一行。
与作者描述的结果几乎相同。
我的解决方案是将 FetchSize 设置为更大的值。正如我所读,默认值为 5000。获取所有内容并遍历它对我来说花费了大约 25 秒。使用 .setFetchSize(50000) 迭代需要 0.8 秒。我什至不相信它仍然。用简单的 foreach 循环迭代
我的代码:
String sql = "...."
prepearedSql = session.prepare(sql);
Statement statement = prepearedSql.bind().setValues(...).setFetchSize(50000);
ResultSet result = session.execute(statement);
for (Row row : result)
{...
我正在使用 Datastax 驱动程序从 Cassandra 获取大量行,我需要尽快处理它们。
我研究过使用 List::parallelStream().forEach()
,起初看起来不错,因为 ResultSet
的行为很像 List
,但遗憾的是我无法直接使用 parallelStream()
在 ResultSet
上。为了让它工作,我首先必须使用 ResultSet::all()
这真的很慢 - 我假设它遍历每个元素。
ResultSet rs = this.getResultSet(); // Takes <1 second
// Convert the ResultSet to a list so as I can use parallelStream().
List<Row> rsList = rs.all(); // Takes 21 seconds
rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds
有没有更快的方法可以处理结果集的每一行?
To get this to work I first have to use ResultSet::all() which really is slow
ResultSet.all()
将使用服务器端 分页 获取 所有行 。您可以使用 statement.setFetchSize()
Is there any faster way I can process each row of the result set?
这取决于您的查询,它是什么?如果你正在做一个完整的分区扫描,只有几台机器在做这项工作,但如果你从多个分区获取数据,你可以尝试用多个查询来并行化它们,每个分区一个
你可以试试这个:
ResultSet rs = this.getResultSet(); // Takes <1 second
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
rs.iterator(), Spliterator.ORDERED), false)
.parallel().forEach(this::processRow);
省略对 rs.all()
希望,如果 ResultSet
允许立即开始迭代,您将能够更早地并行处理。
更新
检查 ResultSet
的来源后,这是我看到的:
方法 all()
创建一个新的 ArrayList
并填充它,在您的情况下需要 21 秒
List<Row> result = new ArrayList<Row>(rows.size());
for (Row row : this)
result.add(row);
迭代器中实现的方法next()
改为轮询行队列
public Row next() {
return Row.fromData(metadata, rows.poll());
}
这意味着数据处理不需要等待 21 秒就可以开始处理第一行。
与作者描述的结果几乎相同。 我的解决方案是将 FetchSize 设置为更大的值。正如我所读,默认值为 5000。获取所有内容并遍历它对我来说花费了大约 25 秒。使用 .setFetchSize(50000) 迭代需要 0.8 秒。我什至不相信它仍然。用简单的 foreach 循环迭代
我的代码:
String sql = "...."
prepearedSql = session.prepare(sql);
Statement statement = prepearedSql.bind().setValues(...).setFetchSize(50000);
ResultSet result = session.execute(statement);
for (Row row : result)
{...