如何在 Cassandra 中使用 datastax java 驱动程序有效地使用准备好的语句?
How to use prepared statement efficiently using datastax java driver in Cassandra?
我需要使用 Datastax Java 驱动程序查询 Cassandra 中的一个表。下面是我的代码,它工作正常 -
public class TestCassandra {
private Session session = null;
private Cluster cluster = null;
private static class ConnectionHolder {
static final TestCassandra connection = new TestCassandra();
}
public static TestCassandra getInstance() {
return ConnectionHolder.connection;
}
private TestCassandra() {
Builder builder = Cluster.builder();
builder.addContactPoints("127.0.0.1");
PoolingOptions opts = new PoolingOptions();
opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));
cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
.withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
session = cluster.connect();
}
private Set<String> getRandomUsers() {
Set<String> userList = new HashSet<String>();
for (int table = 0; table < 14; table++) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
try {
SimpleStatement query = new SimpleStatement(sql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
ResultSet res = session.execute(query);
Iterator<Row> rows = res.iterator();
while (rows.hasNext()) {
Row r = rows.next();
String user_id = r.getString("user_id");
userList.add(user_id);
}
} catch (Exception e) {
System.out.println("error= " + ExceptionUtils.getStackTrace(e));
}
}
return userList;
}
}
我在我的主应用程序中像这样使用上面的 class -
TestCassandra.getInstance().getRandomUsers();
有什么方法可以有效地在 getRandomUsers
中使用 PreparedStatement
吗?我想我需要确保只创建一次 PreparedStatement
而不是多次创建它。在我当前的架构中,最好的设计是什么?我该如何使用它?
您可以创建所需语句的缓存(这是一个相当基本的示例,可以让您了解)。让我们从创建将用作缓存的 class 开始。
private class StatementCache {
Map<String, PreparedStatement> statementCache = new HashMap<>();
public BoundStatement getStatement(String cql) {
PreparedStatement ps = statementCache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
ps = session.prepare(cql);
statementCache.put(cql, ps);
}
return ps.bind();
}
}
然后给你的单例添加一个实例:
public class TestCassandra {
private Session session = null;
private Cluster cluster = null;
private StatementCache psCache = new StatementCache();
// rest of class...
最后使用函数中的缓存:
private Set<String> getRandomUsers(String cql) {
// lots of code.
try {
SimpleStatement query = new SimpleStatement(cql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
// abstract the handling of the cache to it's own class.
// this will need some work to make sure it's thread safe
// as currently it's not.
ResultSet res = session.execute(psCache.getStatement(cql));
我的实现或多或少与上面分享的相同,但有性能检查和实现来处理种族问题conditions.See在我的思维过程中对代码进行内联评论。
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import nl.ing.creditcards.commons.activity.ActivityException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StatementCache {
/* prevent cache incoherence issues*/
private static volatile StatementCache sCacheInstance;
private static final Map<String, PreparedStatement> holder = new ConcurrentHashMap<>();
private static final String NOT_PERMITTED = "Operation not permitted";
private StatementCache() {
/*Prevent access through reflection api.*/
if (sCacheInstance != null) {
throw new ActivityException(NOT_PERMITTED, "Use getInstance() to retrieve the instance of this class");
}
}
/**
* Double check locking pattern usage for singleton classes
*
* @return
*/
public static StatementCache getInstance() {
if (sCacheInstance == null) { //Check for the first time
synchronized (StatementCache.class) { // second check in order to keep the operation atomic
if (sCacheInstance == null) sCacheInstance = new StatementCache();
}
}
return sCacheInstance;
}
/**
* If {@link StatementCache#getStatement#prepared_statement} is already present in cache,
* then we don't have to synchronize and make threads wait, otherwise, we synchronize the caching bit.
*
* @param session
* @param cql
* @return
*/
public PreparedStatement getStatement(Session session, String cql) {
PreparedStatement prepared_statement = holder.get(cql);
if (prepared_statement == null) {
synchronized (this) {
prepared_statement = holder.get(cql);
if (prepared_statement == null) {
prepared_statement = session.prepare(cql);
holder.put(cql, prepared_statement);
}
}
}
return prepared_statement;
}
}
使用这个缓存单例 class 会很简单:
public class CacheConsumer{
private static Session session;
CacheConsumer(Session session){
this.session=session;
}
public void someMethod(){
String cqlstatement = "SELECT * FROM SOME_TABLE";
PreparedStatement statement=
StatementCache.getInstance().getStatement(session,cqlstatement);
// You can now use the prepared statement however you wish.
}
}
很简单 ;)
我需要使用 Datastax Java 驱动程序查询 Cassandra 中的一个表。下面是我的代码,它工作正常 -
public class TestCassandra {
private Session session = null;
private Cluster cluster = null;
private static class ConnectionHolder {
static final TestCassandra connection = new TestCassandra();
}
public static TestCassandra getInstance() {
return ConnectionHolder.connection;
}
private TestCassandra() {
Builder builder = Cluster.builder();
builder.addContactPoints("127.0.0.1");
PoolingOptions opts = new PoolingOptions();
opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));
cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
.withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
session = cluster.connect();
}
private Set<String> getRandomUsers() {
Set<String> userList = new HashSet<String>();
for (int table = 0; table < 14; table++) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
try {
SimpleStatement query = new SimpleStatement(sql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
ResultSet res = session.execute(query);
Iterator<Row> rows = res.iterator();
while (rows.hasNext()) {
Row r = rows.next();
String user_id = r.getString("user_id");
userList.add(user_id);
}
} catch (Exception e) {
System.out.println("error= " + ExceptionUtils.getStackTrace(e));
}
}
return userList;
}
}
我在我的主应用程序中像这样使用上面的 class -
TestCassandra.getInstance().getRandomUsers();
有什么方法可以有效地在 getRandomUsers
中使用 PreparedStatement
吗?我想我需要确保只创建一次 PreparedStatement
而不是多次创建它。在我当前的架构中,最好的设计是什么?我该如何使用它?
您可以创建所需语句的缓存(这是一个相当基本的示例,可以让您了解)。让我们从创建将用作缓存的 class 开始。
private class StatementCache {
Map<String, PreparedStatement> statementCache = new HashMap<>();
public BoundStatement getStatement(String cql) {
PreparedStatement ps = statementCache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
ps = session.prepare(cql);
statementCache.put(cql, ps);
}
return ps.bind();
}
}
然后给你的单例添加一个实例:
public class TestCassandra {
private Session session = null;
private Cluster cluster = null;
private StatementCache psCache = new StatementCache();
// rest of class...
最后使用函数中的缓存:
private Set<String> getRandomUsers(String cql) {
// lots of code.
try {
SimpleStatement query = new SimpleStatement(cql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
// abstract the handling of the cache to it's own class.
// this will need some work to make sure it's thread safe
// as currently it's not.
ResultSet res = session.execute(psCache.getStatement(cql));
我的实现或多或少与上面分享的相同,但有性能检查和实现来处理种族问题conditions.See在我的思维过程中对代码进行内联评论。
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import nl.ing.creditcards.commons.activity.ActivityException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StatementCache {
/* prevent cache incoherence issues*/
private static volatile StatementCache sCacheInstance;
private static final Map<String, PreparedStatement> holder = new ConcurrentHashMap<>();
private static final String NOT_PERMITTED = "Operation not permitted";
private StatementCache() {
/*Prevent access through reflection api.*/
if (sCacheInstance != null) {
throw new ActivityException(NOT_PERMITTED, "Use getInstance() to retrieve the instance of this class");
}
}
/**
* Double check locking pattern usage for singleton classes
*
* @return
*/
public static StatementCache getInstance() {
if (sCacheInstance == null) { //Check for the first time
synchronized (StatementCache.class) { // second check in order to keep the operation atomic
if (sCacheInstance == null) sCacheInstance = new StatementCache();
}
}
return sCacheInstance;
}
/**
* If {@link StatementCache#getStatement#prepared_statement} is already present in cache,
* then we don't have to synchronize and make threads wait, otherwise, we synchronize the caching bit.
*
* @param session
* @param cql
* @return
*/
public PreparedStatement getStatement(Session session, String cql) {
PreparedStatement prepared_statement = holder.get(cql);
if (prepared_statement == null) {
synchronized (this) {
prepared_statement = holder.get(cql);
if (prepared_statement == null) {
prepared_statement = session.prepare(cql);
holder.put(cql, prepared_statement);
}
}
}
return prepared_statement;
}
}
使用这个缓存单例 class 会很简单:
public class CacheConsumer{
private static Session session;
CacheConsumer(Session session){
this.session=session;
}
public void someMethod(){
String cqlstatement = "SELECT * FROM SOME_TABLE";
PreparedStatement statement=
StatementCache.getInstance().getStatement(session,cqlstatement);
// You can now use the prepared statement however you wish.
}
}
很简单 ;)