如何使用 datastax cassandra-core-api 将数据插入 4 节点 cassandra 集群?

How to insert data into a 4 node cassandra cluster using datastax cassandra-core-api?

我有一个单节点 (DataStax) Casandra 集群,我不得不从一个文件中插入大约 10gb 的数据。我写了一个 java 程序来读取文件并将数据存储如下:

 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Date;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;

 public class Xb {

//cluster and session for cassandra connection
private static Cluster cluster;
private static Session session;

//variables for storing file elements
private static String taxid;
private static String geneid;
private static String status;
private static String rna_version;
private static String rna_gi;

private static String protein_version;
private static String protein_gi;
private static String gen_nuc_ver;

private static String gen_nuc_gi;
private static String start_gen_acc;
private static String end_gen_acc;

private static String orientation;
private static String assembly;

     private static String mature_ver;

     private static String mature_gi;

     private static String symbol;

    //Connecting the cassandra node(local host)
    public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
   }
    public static void main(String[] args) {
    private static String symbol;
    long lStartTime = new Date().getTime();
    // TODO Auto-generated method stub
    //call connect by passing localhost 
    cluster =connect("localhost");
    session = cluster.connect();
    //session.execute("CREATE KEYSPACE test1 WITH REPLICATION =" +"{'class':'SimpleStrategy','replication_factor':3}");
    //session.createtable('genomics');
    //use test1 : triggers the use of test1 keyspace
    session.execute("USE test1");
    //for counting the lines in the file
    int lineCount=0;

    try
    {
        //Reading the file
        FileReader fr = new FileReader("/home/syedammar/gene2refseq/gene2refseq");
        BufferedReader bf = new BufferedReader(fr);
        String line;
        //iterating over each line in file
        while((line= bf.readLine())!=null){
                lineCount++;
                //splitting the line based on tab spaces
                String[] a =line.split("\s+");
                System.out.println("Line Count now is ->"+lineCount);
                //System.out.println("This is content"+line+" OVER HERE");
                /*for(int i =0;i<a.length;i++){
                System.out.println(i+"->"+a[i]);
              }*/
                //assigning the values to the corresponding variables
                taxid =a[0];
                geneid=a[1];
                status=a[2];
                rna_version=a[3];
                rna_gi=a[4];
                protein_version=a[5];
                protein_gi=a[6]; 
                gen_nuc_ver=a[7];
                gen_nuc_gi=a[8];
                start_gen_acc=a[9];
                end_gen_acc=a[10];
                orientation=a[11];
                assembly=a[12];
                mature_ver=a[13];
                mature_gi=a[14];
                symbol=a[15];

            //Writing the insert query
            PreparedStatement statement = session.prepare(
            "INSERT INTO test.genomics " +
            "(taxid, " +
            "geneid, " +
            "status, " +
            "rna_version, " +
            "rna_gi, " +
            "protein_version, " +
            "protein_gi, " +
            "gen_nuc_ver, " +
            "gen_nuc_gi, " +
            "start_gen_acc, " +
            "end_gen_acc, " +
            "orientation, " +
            "assembly, " +
            "mature_ver, " +
            "mature_gi," +
            "symbol" + 
            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); 

            //create the bound statement and initialise it with your prepared statement
            BoundStatement boundStatement = new BoundStatement(statement); 

            session.execute( // this is where the query is executed
            boundStatement.bind( // here you are binding the 'boundStatement'
            taxid,geneid,status,rna_version,rna_gi,protein_version,protein_gi,gen_nuc_ver,gen_nuc_gi,start_gen_acc,end_gen_acc,orientation,assembly,mature_ver,mature_gi,symbol));
    }//end of while
} //end of try
    catch(IOException e){
        e.printStackTrace();
    }   
        long lEndTime = new Date().getTime(); 
        long difference = lEndTime - lStartTime;
        int seconds = (int) (difference / 1000) % 60 ; //converting milliseconds to seconds
        System.out.println("Elapsed seconds: " + seconds);
        System.out.println("No of lines read are :"+ lineCount);
        System.out.println("Record's entered into cassandra successfully");

        session.close();
        cluster.close();http://whosebug.com/editing-help

    }//end of m}// end of class

这很好用,我将记录存储在 Cassandra 中。

现在我已经设置了一个 4 节点的 Cassandra 集群,我想执行相同的任务来读取相同的文件并将其内容存储到 4 节点的集群中。

我的问题是我该怎么做,我需要将这个程序提供给哪个节点。我该如何处理?

我的问题是如何与 4 节点集群建立连接,我必须在上面的代码中进行哪些更改。好像这部分会有一些变化

 public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
} 

有什么变化,我将该程序提供给哪个节点?我不清楚它会如何发生。另外让我知道将整个数据插入 4 节点集群所花费的时间是否与单节点所花费的时间相同,还是会更快。

谢谢

有关如何使用 DataStax java driver 将数据最好地加载到 Cassandra 的一个很好的示例(参考程序),请查看 Brian Hess's Cassandra-loader

which node do I need to feed this program

所有的cassandra节点都是平等的,它们都可以写入。但是,driver 会为您解决这个问题。只需给它一些节点作为端点,当它建立连接时,它就会知道存在哪些节点。它还将知道哪些节点拥有哪些数据并相应地执行写入。

will it take the same amount of time to insert the entire data in 4 node cluster as it took for single node or will it be faster.

一旦您将复制因素考虑在内,您的集群将随着您添加节点而线性扩展。因此,您将能够线性增加吞吐量。即,如果 3 个节点 RF3 可以进行 X 次写入,则具有 RF3 的 6 个节点可以进行 ~2X 次写入。