JAVA 将信号量与 Cassandra 结合使用以限制 executeAsync 写入以消除 NoHostAvailableException 错误的代码
JAVA code to use semaphore with Cassandra to throttle executeAsync writes to eliminate NoHostAvailableException errors
我有一些基本代码,在 for 循环中使用准备好的语句并将结果写入 Cassandra Table,并使用信号量进行一些节流。
Session session = null;
try {
session = connector.openSession();
} catch( Exception ex ) {
// .. moan and complain..
System.err.printf("Got %s trying to openSession - %s\n", ex.getClass().getCanonicalName(), ex.getMessage() );
}
if( session != null ) {
// Prepared Statement for Cassandra Inserts
PreparedStatement statement = session.prepare(
"INSERT INTO model.base " +
"(channel, " +
"time_key, " +
"power" +
") VALUES (?,?,?);");
BoundStatement boundStatement = new BoundStatement(statement);
//Query Cassandra Table that has capital letters in the column names
ResultSet results = session.execute("SELECT \"Time_Key\",\"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\" limit 800000;");
// Get the Variables from each Row of Cassandra Data
for (Row row : results){
// Upper Case Column Names in Cassandra
time_key = row.getLong("Time_Key");
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; channel+=increment ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
ResultSetFuture rsf = session.executeAsync(boundStatement.bind(channel,time_key,power));
backlogList.add( rsf ); // put the new one at the end of the list
if( backlogList.size() > 10000 ) { // wait till we have a few
while( backlogList.size() > 5432 ) { // then harvest about half of the oldest ones of them
rsf = backlogList.remove(0);
rsf.getUninterruptibly();
} // end while
} // end if
} // end if
} // end for
} // end "row" for
} // end session
我的连接是通过以下方式建立的:
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Syntax: com.neutronis.Spark_Reports <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Spark Reports");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
Spark_Reports app = new Spark_Reports(conf);
app.run();
}
使用此代码我尝试使用信号量,但我的 Cassandra 集群似乎仍然过载并引发错误:
ERROR ControlConnection: [Control connection] Cannot connect to any
host, scheduling retry in 1000 milliseconds Exception in thread "main"
com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (no host was tried)
奇怪的是,它说没有尝试主机。
我查看了其他信号量限制问题,例如 and this 并尝试将其应用于我上面的代码,但仍然出现错误。
阅读我对这个问题的回答,了解在使用异步调用时如何反压:What is the best way to get backpressure for Cassandra Writes?
我有一些基本代码,在 for 循环中使用准备好的语句并将结果写入 Cassandra Table,并使用信号量进行一些节流。
Session session = null;
try {
session = connector.openSession();
} catch( Exception ex ) {
// .. moan and complain..
System.err.printf("Got %s trying to openSession - %s\n", ex.getClass().getCanonicalName(), ex.getMessage() );
}
if( session != null ) {
// Prepared Statement for Cassandra Inserts
PreparedStatement statement = session.prepare(
"INSERT INTO model.base " +
"(channel, " +
"time_key, " +
"power" +
") VALUES (?,?,?);");
BoundStatement boundStatement = new BoundStatement(statement);
//Query Cassandra Table that has capital letters in the column names
ResultSet results = session.execute("SELECT \"Time_Key\",\"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\" limit 800000;");
// Get the Variables from each Row of Cassandra Data
for (Row row : results){
// Upper Case Column Names in Cassandra
time_key = row.getLong("Time_Key");
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; channel+=increment ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
ResultSetFuture rsf = session.executeAsync(boundStatement.bind(channel,time_key,power));
backlogList.add( rsf ); // put the new one at the end of the list
if( backlogList.size() > 10000 ) { // wait till we have a few
while( backlogList.size() > 5432 ) { // then harvest about half of the oldest ones of them
rsf = backlogList.remove(0);
rsf.getUninterruptibly();
} // end while
} // end if
} // end if
} // end for
} // end "row" for
} // end session
我的连接是通过以下方式建立的:
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Syntax: com.neutronis.Spark_Reports <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Spark Reports");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
Spark_Reports app = new Spark_Reports(conf);
app.run();
}
使用此代码我尝试使用信号量,但我的 Cassandra 集群似乎仍然过载并引发错误:
ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
奇怪的是,它说没有尝试主机。
我查看了其他信号量限制问题,例如
阅读我对这个问题的回答,了解在使用异步调用时如何反压:What is the best way to get backpressure for Cassandra Writes?