Cassandra datastax 驱动 ResultSet 在多个线程中共享以实现快速读取
Cassandra datastax driver ResultSet sharing in multiple threads for fast reading
我在 cassandra 中有巨大的表,超过 20 亿行并且还在增加。这些行有一个日期字段,它遵循日期桶模式以限制每一行。
即便如此,对于某个特定日期,我也有超过一百万的条目。
我想尽快读取和处理每一天的行。我正在做的是获取 com.datastax.driver.core.ResultSet
的实例并从中获取迭代器并在多个线程之间共享该迭代器。
所以,基本上我想增加读取吞吐量。这是正确的方法吗?如果没有,请提出更好的方法。
很遗憾,您不能按原样执行此操作。原因是 ResultSet 提供了一个 internal paging state 用于一次检索 1 页的行。
但是您确实有选择。由于我想您正在执行范围查询(跨多个分区的查询),因此您可以使用一种策略,即使用令牌指令一次提交跨令牌范围的多个查询。 Paging through unordered partitioner results.
中记录了一个很好的例子
java-driver 2.0.10 和 2.1.5 各自提供了一种从主机和 splitting them. There is an example of how to do this in the java-driver's integration tests in TokenRangeIntegrationTest.java#should_expose_token_ranges():
检索令牌范围的机制
PreparedStatement rangeStmt = session.prepare("SELECT i FROM foo WHERE token(i) > ? and token(i) <= ?");
TokenRange foundRange = null;
for (TokenRange range : metadata.getTokenRanges()) {
List<Row> rows = rangeQuery(rangeStmt, range);
for (Row row : rows) {
if (row.getInt("i") == testKey) {
// We should find our test key exactly once
assertThat(foundRange)
.describedAs("found the same key in two ranges: " + foundRange + " and " + range)
.isNull();
foundRange = range;
// That range should be managed by the replica
assertThat(metadata.getReplicas("test", range)).contains(replica);
}
}
}
assertThat(foundRange).isNotNull();
}
...
private List<Row> rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
List<Row> rows = Lists.newArrayList();
for (TokenRange subRange : range.unwrap()) {
Statement statement = rangeStmt.bind(subRange.getStart(), subRange.getEnd());
rows.addAll(session.execute(statement).all());
}
return rows;
}
您基本上可以生成您的语句并以异步方式提交它们,上面的示例只是一次迭代一个语句。
另一种选择是使用spark-cassandra-connector, which essentially does this under the covers and in a very efficient way. I find it very easy to use and you don't even need to set up a spark cluster to use it. See this document如何使用JavaAPI。
我在 cassandra 中有巨大的表,超过 20 亿行并且还在增加。这些行有一个日期字段,它遵循日期桶模式以限制每一行。
即便如此,对于某个特定日期,我也有超过一百万的条目。
我想尽快读取和处理每一天的行。我正在做的是获取 com.datastax.driver.core.ResultSet
的实例并从中获取迭代器并在多个线程之间共享该迭代器。
所以,基本上我想增加读取吞吐量。这是正确的方法吗?如果没有,请提出更好的方法。
很遗憾,您不能按原样执行此操作。原因是 ResultSet 提供了一个 internal paging state 用于一次检索 1 页的行。
但是您确实有选择。由于我想您正在执行范围查询(跨多个分区的查询),因此您可以使用一种策略,即使用令牌指令一次提交跨令牌范围的多个查询。 Paging through unordered partitioner results.
中记录了一个很好的例子java-driver 2.0.10 和 2.1.5 各自提供了一种从主机和 splitting them. There is an example of how to do this in the java-driver's integration tests in TokenRangeIntegrationTest.java#should_expose_token_ranges():
检索令牌范围的机制 PreparedStatement rangeStmt = session.prepare("SELECT i FROM foo WHERE token(i) > ? and token(i) <= ?");
TokenRange foundRange = null;
for (TokenRange range : metadata.getTokenRanges()) {
List<Row> rows = rangeQuery(rangeStmt, range);
for (Row row : rows) {
if (row.getInt("i") == testKey) {
// We should find our test key exactly once
assertThat(foundRange)
.describedAs("found the same key in two ranges: " + foundRange + " and " + range)
.isNull();
foundRange = range;
// That range should be managed by the replica
assertThat(metadata.getReplicas("test", range)).contains(replica);
}
}
}
assertThat(foundRange).isNotNull();
}
...
private List<Row> rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
List<Row> rows = Lists.newArrayList();
for (TokenRange subRange : range.unwrap()) {
Statement statement = rangeStmt.bind(subRange.getStart(), subRange.getEnd());
rows.addAll(session.execute(statement).all());
}
return rows;
}
您基本上可以生成您的语句并以异步方式提交它们,上面的示例只是一次迭代一个语句。
另一种选择是使用spark-cassandra-connector, which essentially does this under the covers and in a very efficient way. I find it very easy to use and you don't even need to set up a spark cluster to use it. See this document如何使用JavaAPI。