如何使用 datastax cassandra-core-api 将数据插入 4 节点 cassandra 集群?
How to insert data into a 4 node cassandra cluster using datastax cassandra-core-api?
我有一个单节点 (DataStax) Casandra 集群,我不得不从一个文件中插入大约 10gb 的数据。我写了一个 java 程序来读取文件并将数据存储如下:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Date;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
public class Xb {
//cluster and session for cassandra connection
private static Cluster cluster;
private static Session session;
//variables for storing file elements
private static String taxid;
private static String geneid;
private static String status;
private static String rna_version;
private static String rna_gi;
private static String protein_version;
private static String protein_gi;
private static String gen_nuc_ver;
private static String gen_nuc_gi;
private static String start_gen_acc;
private static String end_gen_acc;
private static String orientation;
private static String assembly;
private static String mature_ver;
private static String mature_gi;
private static String symbol;
//Connecting the cassandra node(local host)
public static Cluster connect(String node){
return Cluster.builder().addContactPoint(node).build();
}
public static void main(String[] args) {
private static String symbol;
long lStartTime = new Date().getTime();
// TODO Auto-generated method stub
//call connect by passing localhost
cluster =connect("localhost");
session = cluster.connect();
//session.execute("CREATE KEYSPACE test1 WITH REPLICATION =" +"{'class':'SimpleStrategy','replication_factor':3}");
//session.createtable('genomics');
//use test1 : triggers the use of test1 keyspace
session.execute("USE test1");
//for counting the lines in the file
int lineCount=0;
try
{
//Reading the file
FileReader fr = new FileReader("/home/syedammar/gene2refseq/gene2refseq");
BufferedReader bf = new BufferedReader(fr);
String line;
//iterating over each line in file
while((line= bf.readLine())!=null){
lineCount++;
//splitting the line based on tab spaces
String[] a =line.split("\s+");
System.out.println("Line Count now is ->"+lineCount);
//System.out.println("This is content"+line+" OVER HERE");
/*for(int i =0;i<a.length;i++){
System.out.println(i+"->"+a[i]);
}*/
//assigning the values to the corresponding variables
taxid =a[0];
geneid=a[1];
status=a[2];
rna_version=a[3];
rna_gi=a[4];
protein_version=a[5];
protein_gi=a[6];
gen_nuc_ver=a[7];
gen_nuc_gi=a[8];
start_gen_acc=a[9];
end_gen_acc=a[10];
orientation=a[11];
assembly=a[12];
mature_ver=a[13];
mature_gi=a[14];
symbol=a[15];
//Writing the insert query
PreparedStatement statement = session.prepare(
"INSERT INTO test.genomics " +
"(taxid, " +
"geneid, " +
"status, " +
"rna_version, " +
"rna_gi, " +
"protein_version, " +
"protein_gi, " +
"gen_nuc_ver, " +
"gen_nuc_gi, " +
"start_gen_acc, " +
"end_gen_acc, " +
"orientation, " +
"assembly, " +
"mature_ver, " +
"mature_gi," +
"symbol" +
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
//create the bound statement and initialise it with your prepared statement
BoundStatement boundStatement = new BoundStatement(statement);
session.execute( // this is where the query is executed
boundStatement.bind( // here you are binding the 'boundStatement'
taxid,geneid,status,rna_version,rna_gi,protein_version,protein_gi,gen_nuc_ver,gen_nuc_gi,start_gen_acc,end_gen_acc,orientation,assembly,mature_ver,mature_gi,symbol));
}//end of while
} //end of try
catch(IOException e){
e.printStackTrace();
}
long lEndTime = new Date().getTime();
long difference = lEndTime - lStartTime;
int seconds = (int) (difference / 1000) % 60 ; //converting milliseconds to seconds
System.out.println("Elapsed seconds: " + seconds);
System.out.println("No of lines read are :"+ lineCount);
System.out.println("Record's entered into cassandra successfully");
session.close();
cluster.close();http://whosebug.com/editing-help
}//end of m}// end of class
这很好用,我将记录存储在 Cassandra 中。
现在我已经设置了一个 4 节点的 Cassandra 集群,我想执行相同的任务来读取相同的文件并将其内容存储到 4 节点的集群中。
我的问题是我该怎么做,我需要将这个程序提供给哪个节点。我该如何处理?
我的问题是如何与 4 节点集群建立连接,我必须在上面的代码中进行哪些更改。好像这部分会有一些变化
public static Cluster connect(String node){
return Cluster.builder().addContactPoint(node).build();
}
有什么变化,我将该程序提供给哪个节点?我不清楚它会如何发生。另外让我知道将整个数据插入 4 节点集群所花费的时间是否与单节点所花费的时间相同,还是会更快。
谢谢
有关如何使用 DataStax java driver 将数据最好地加载到 Cassandra 的一个很好的示例(参考程序),请查看 Brian Hess's Cassandra-loader。
which node do I need to feed this program
所有的cassandra节点都是平等的,它们都可以写入。但是,driver 会为您解决这个问题。只需给它一些节点作为端点,当它建立连接时,它就会知道存在哪些节点。它还将知道哪些节点拥有哪些数据并相应地执行写入。
will it take the same amount of time to insert the entire data in 4
node cluster as it took for single node or will it be faster.
一旦您将复制因素考虑在内,您的集群将随着您添加节点而线性扩展。因此,您将能够线性增加吞吐量。即,如果 3 个节点 RF3 可以进行 X 次写入,则具有 RF3 的 6 个节点可以进行 ~2X 次写入。
我有一个单节点 (DataStax) Casandra 集群,我不得不从一个文件中插入大约 10gb 的数据。我写了一个 java 程序来读取文件并将数据存储如下:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Date;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
public class Xb {
//cluster and session for cassandra connection
private static Cluster cluster;
private static Session session;
//variables for storing file elements
private static String taxid;
private static String geneid;
private static String status;
private static String rna_version;
private static String rna_gi;
private static String protein_version;
private static String protein_gi;
private static String gen_nuc_ver;
private static String gen_nuc_gi;
private static String start_gen_acc;
private static String end_gen_acc;
private static String orientation;
private static String assembly;
private static String mature_ver;
private static String mature_gi;
private static String symbol;
//Connecting the cassandra node(local host)
public static Cluster connect(String node){
return Cluster.builder().addContactPoint(node).build();
}
public static void main(String[] args) {
private static String symbol;
long lStartTime = new Date().getTime();
// TODO Auto-generated method stub
//call connect by passing localhost
cluster =connect("localhost");
session = cluster.connect();
//session.execute("CREATE KEYSPACE test1 WITH REPLICATION =" +"{'class':'SimpleStrategy','replication_factor':3}");
//session.createtable('genomics');
//use test1 : triggers the use of test1 keyspace
session.execute("USE test1");
//for counting the lines in the file
int lineCount=0;
try
{
//Reading the file
FileReader fr = new FileReader("/home/syedammar/gene2refseq/gene2refseq");
BufferedReader bf = new BufferedReader(fr);
String line;
//iterating over each line in file
while((line= bf.readLine())!=null){
lineCount++;
//splitting the line based on tab spaces
String[] a =line.split("\s+");
System.out.println("Line Count now is ->"+lineCount);
//System.out.println("This is content"+line+" OVER HERE");
/*for(int i =0;i<a.length;i++){
System.out.println(i+"->"+a[i]);
}*/
//assigning the values to the corresponding variables
taxid =a[0];
geneid=a[1];
status=a[2];
rna_version=a[3];
rna_gi=a[4];
protein_version=a[5];
protein_gi=a[6];
gen_nuc_ver=a[7];
gen_nuc_gi=a[8];
start_gen_acc=a[9];
end_gen_acc=a[10];
orientation=a[11];
assembly=a[12];
mature_ver=a[13];
mature_gi=a[14];
symbol=a[15];
//Writing the insert query
PreparedStatement statement = session.prepare(
"INSERT INTO test.genomics " +
"(taxid, " +
"geneid, " +
"status, " +
"rna_version, " +
"rna_gi, " +
"protein_version, " +
"protein_gi, " +
"gen_nuc_ver, " +
"gen_nuc_gi, " +
"start_gen_acc, " +
"end_gen_acc, " +
"orientation, " +
"assembly, " +
"mature_ver, " +
"mature_gi," +
"symbol" +
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
//create the bound statement and initialise it with your prepared statement
BoundStatement boundStatement = new BoundStatement(statement);
session.execute( // this is where the query is executed
boundStatement.bind( // here you are binding the 'boundStatement'
taxid,geneid,status,rna_version,rna_gi,protein_version,protein_gi,gen_nuc_ver,gen_nuc_gi,start_gen_acc,end_gen_acc,orientation,assembly,mature_ver,mature_gi,symbol));
}//end of while
} //end of try
catch(IOException e){
e.printStackTrace();
}
long lEndTime = new Date().getTime();
long difference = lEndTime - lStartTime;
int seconds = (int) (difference / 1000) % 60 ; //converting milliseconds to seconds
System.out.println("Elapsed seconds: " + seconds);
System.out.println("No of lines read are :"+ lineCount);
System.out.println("Record's entered into cassandra successfully");
session.close();
cluster.close();http://whosebug.com/editing-help
}//end of m}// end of class
这很好用,我将记录存储在 Cassandra 中。
现在我已经设置了一个 4 节点的 Cassandra 集群,我想执行相同的任务来读取相同的文件并将其内容存储到 4 节点的集群中。
我的问题是我该怎么做,我需要将这个程序提供给哪个节点。我该如何处理?
我的问题是如何与 4 节点集群建立连接,我必须在上面的代码中进行哪些更改。好像这部分会有一些变化
public static Cluster connect(String node){
return Cluster.builder().addContactPoint(node).build();
}
有什么变化,我将该程序提供给哪个节点?我不清楚它会如何发生。另外让我知道将整个数据插入 4 节点集群所花费的时间是否与单节点所花费的时间相同,还是会更快。
谢谢
有关如何使用 DataStax java driver 将数据最好地加载到 Cassandra 的一个很好的示例(参考程序),请查看 Brian Hess's Cassandra-loader。
which node do I need to feed this program
所有的cassandra节点都是平等的,它们都可以写入。但是,driver 会为您解决这个问题。只需给它一些节点作为端点,当它建立连接时,它就会知道存在哪些节点。它还将知道哪些节点拥有哪些数据并相应地执行写入。
will it take the same amount of time to insert the entire data in 4 node cluster as it took for single node or will it be faster.
一旦您将复制因素考虑在内,您的集群将随着您添加节点而线性扩展。因此,您将能够线性增加吞吐量。即,如果 3 个节点 RF3 可以进行 X 次写入,则具有 RF3 的 6 个节点可以进行 ~2X 次写入。