如何修复无法在 {server ip}:9042 打开与 Cassandra 的本机连接
How to fix Failed to open native connection to Cassandra at {server ip}:9042
我正在尝试使用 spark-cassandra-connector 连接 spark 和 Cassandra。连接已建立,但是当我尝试对我面临的 JavaRDD 执行操作时。
java.io.IOException: Failed to open native connection to Cassandra at {10.0.21.92}:9042
这是我要实现的配置和代码:
SparkConf sparkConf = new SparkConf().setAppName("Data Transformation").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", server ip);
sparkConf.set("spark.cassandra.connection.port", "9042");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.cassandra.auth.username", user_name);
sparkConf.set("spark.cassandra.auth.password", password);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
下面是我在 javardd 上执行操作的代码:
CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keySpaceName, tableName);
JavaRDD<GenericTriggerEntity> rdd = cassandraRDD.map(new Function<CassandraRow, GenericTriggerEntity>() {
private static final long serialVersionUID = -165799649937652815L;
@Override
public GenericTriggerEntity call(CassandraRow row) throws Exception {
GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
if(row.getString("end") != null) genericTriggerEntity.setEnd(row.getString("end"));
if(row.getString("key") != null)
genericTriggerEntity.setKey(row.getString("key"));
genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
genericTriggerEntity.setRowdeleted(row.getString("rowDeleted"));
genericTriggerEntity.setRows(row.getString("rows"));
genericTriggerEntity.setStart(row.getString("start"));
genericTriggerEntity.setTablename("tablename");
genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
genericTriggerEntity.setTriggertime(row.getString("triggertime"));
genericTriggerEntity.setUuid(row.getUUID("uuid"));
return genericTriggerEntity;
}
});
这是我正在执行的 JavaRDD 操作
JavaRDD<String> jsonDataRDDwords = rdd.flatMap(s -> Arrays.asList(SPACE.split((CharSequence) s)));
JavaPairRDD<String, Integer> jsonDataRDDones = jsonDataRDDwords.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> jsonDataRDDcounts = jsonDataRDDones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> jsonDatRDDoutput = jsonDataRDDcounts.collect();
我什至尝试通过 telnet 连接到 Cassandra 服务器,端口已打开。
我能够建立连接,但是在执行 reduceByKey 时出现上述异常。
我无法弄清楚问题出在哪里。 javardd 操作有问题吗?
任何帮助,将不胜感激。
提前致谢。
以上错误是由于cassandra 驱动核心的一些依赖问题造成的。
通过在我的 pom.xml
中添加度量依赖项来解决它
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>
您可以使用 socat 命令将本地端口转发到远程 cassandra 端口:
apt-get install socat
socat tcp-listen:9042,fork tcp:10.0.21.92:9042 &
我正在尝试使用 spark-cassandra-connector 连接 spark 和 Cassandra。连接已建立,但是当我尝试对我面临的 JavaRDD 执行操作时。
java.io.IOException: Failed to open native connection to Cassandra at {10.0.21.92}:9042
这是我要实现的配置和代码:
SparkConf sparkConf = new SparkConf().setAppName("Data Transformation").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", server ip);
sparkConf.set("spark.cassandra.connection.port", "9042");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.cassandra.auth.username", user_name);
sparkConf.set("spark.cassandra.auth.password", password);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
下面是我在 javardd 上执行操作的代码:
CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keySpaceName, tableName);
JavaRDD<GenericTriggerEntity> rdd = cassandraRDD.map(new Function<CassandraRow, GenericTriggerEntity>() {
private static final long serialVersionUID = -165799649937652815L;
@Override
public GenericTriggerEntity call(CassandraRow row) throws Exception {
GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
if(row.getString("end") != null) genericTriggerEntity.setEnd(row.getString("end"));
if(row.getString("key") != null)
genericTriggerEntity.setKey(row.getString("key"));
genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
genericTriggerEntity.setRowdeleted(row.getString("rowDeleted"));
genericTriggerEntity.setRows(row.getString("rows"));
genericTriggerEntity.setStart(row.getString("start"));
genericTriggerEntity.setTablename("tablename");
genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
genericTriggerEntity.setTriggertime(row.getString("triggertime"));
genericTriggerEntity.setUuid(row.getUUID("uuid"));
return genericTriggerEntity;
}
});
这是我正在执行的 JavaRDD 操作
JavaRDD<String> jsonDataRDDwords = rdd.flatMap(s -> Arrays.asList(SPACE.split((CharSequence) s)));
JavaPairRDD<String, Integer> jsonDataRDDones = jsonDataRDDwords.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> jsonDataRDDcounts = jsonDataRDDones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> jsonDatRDDoutput = jsonDataRDDcounts.collect();
我什至尝试通过 telnet 连接到 Cassandra 服务器,端口已打开。
我能够建立连接,但是在执行 reduceByKey 时出现上述异常。
我无法弄清楚问题出在哪里。 javardd 操作有问题吗? 任何帮助,将不胜感激。 提前致谢。
以上错误是由于cassandra 驱动核心的一些依赖问题造成的。 通过在我的 pom.xml
中添加度量依赖项来解决它<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>
您可以使用 socat 命令将本地端口转发到远程 cassandra 端口:
apt-get install socat
socat tcp-listen:9042,fork tcp:10.0.21.92:9042 &