如何在 Cassandra 中使用 datastax java 驱动程序有效地使用准备好的语句?

How to use prepared statement efficiently using datastax java driver in Cassandra?

我需要使用 Datastax Java 驱动程序查询 Cassandra 中的一个表。下面是我的代码,它工作正常 -

public class TestCassandra {

        private Session session = null;
        private Cluster cluster = null;

        private static class ConnectionHolder {
            static final TestCassandra connection = new TestCassandra();

        public static TestCassandra getInstance() {
            return ConnectionHolder.connection;

        private TestCassandra() {
            Builder builder = Cluster.builder();

            PoolingOptions opts = new PoolingOptions();
            opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));

            cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
                    .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
                    .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
            session = cluster.connect();

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();

        for (int table = 0; table < 14; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";

            try {
                SimpleStatement query = new SimpleStatement(sql);
                ResultSet res = session.execute(query);

                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();

                    String user_id = r.getString("user_id");
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));

        return userList;

我在我的主应用程序中像这样使用上面的 class -


有什么方法可以有效地在 getRandomUsers 中使用 PreparedStatement 吗?我想我需要确保只创建一次 PreparedStatement 而不是多次创建它。在我当前的架构中,最好的设计是什么?我该如何使用它?

您可以创建所需语句的缓存(这是一个相当基本的示例,可以让您了解)。让我们从创建将用作缓存的 class 开始。

private class StatementCache {
    Map<String, PreparedStatement> statementCache = new HashMap<>();
    public BoundStatement getStatement(String cql) {
        PreparedStatement ps = statementCache.get(cql);
        // no statement cached, create one and cache it now.
        if (ps == null) {
            ps = session.prepare(cql);
            statementCache.put(cql, ps);
        return ps.bind();


public class TestCassandra {
    private Session session = null;
    private Cluster cluster = null;
    private StatementCache psCache = new StatementCache();
    // rest of class...


private Set<String> getRandomUsers(String cql) {
// lots of code.    
        try {
            SimpleStatement query = new SimpleStatement(cql);
            // abstract the handling of the cache to it's own class.
            // this will need some work to make sure it's thread safe
            // as currently it's not.
            ResultSet res = session.execute(psCache.getStatement(cql));


 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import nl.ing.creditcards.commons.activity.ActivityException;

 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;

public class StatementCache {

/* prevent cache incoherence issues*/
private static volatile StatementCache sCacheInstance;
private static final Map<String, PreparedStatement> holder = new ConcurrentHashMap<>();
private static final String NOT_PERMITTED = "Operation not permitted";

private StatementCache() {
    /*Prevent access through reflection api.*/
    if (sCacheInstance != null) {
        throw new ActivityException(NOT_PERMITTED, "Use getInstance() to retrieve the instance of this class");

 * Double check locking pattern usage for singleton classes
 * @return
public static StatementCache getInstance() {
    if (sCacheInstance == null) { //Check for the first time
        synchronized (StatementCache.class) { // second check in order to keep the operation atomic
            if (sCacheInstance == null) sCacheInstance = new StatementCache();
    return sCacheInstance;

 * If {@link StatementCache#getStatement#prepared_statement} is already present in cache,
 * then we don't have to synchronize and make threads wait, otherwise, we synchronize the caching bit.
 * @param session
 * @param cql
 * @return
public PreparedStatement getStatement(Session session, String cql) {
    PreparedStatement prepared_statement = holder.get(cql);
    if (prepared_statement == null) {
        synchronized (this) {
            prepared_statement = holder.get(cql);
            if (prepared_statement == null) {
                prepared_statement = session.prepare(cql);
                holder.put(cql, prepared_statement);
    return prepared_statement;

使用这个缓存单例 class 会很简单:

public class CacheConsumer{

    private static Session session;

    CacheConsumer(Session session){

    public void someMethod(){
      String cqlstatement = "SELECT * FROM SOME_TABLE";
      PreparedStatement statement= 
         // You can now use the prepared statement however you wish.

很简单 ;)