无法通过 spark-cassandra-connector 打开与 Cassandra 节点的节俭连接

Fail to open thrift connection to Cassandra node via spark-cassandra-connector

我折腾了一整天,没有找到解决办法。

我正在尝试使用 spark-cassandra 连接器通过 Spark Streaming 应用程序连接远程 Cassandra 节点,但该应用程序存在异常。任何帮助将不胜感激。

2015-02-17 19:13:58 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:13:58 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:13:59 DEBUG ControlConnection:262 - [Control connection] Refreshing schema
2015-02-17 19:14:00 DEBUG ControlConnection:492 - [Control connection] Refreshing node list and token map
2015-02-17 19:14:00 DEBUG ControlConnection:172 - [Control connection] Successfully connected to /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:00 INFO  Cluster:1267 - New Cassandra host /<MY_PUBLIC_IP>:9042 added
2015-02-17 19:14:00 INFO  CassandraConnector:51 - Connected to Cassandra cluster: Test Cluster
2015-02-17 19:14:00 INFO  LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Connection:114 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=false] Transport initialized and ready
2015-02-17 19:14:01 DEBUG Session:304 - Added connection pool for /<MY_PUBLIC_IP>:9042
2015-02-17 19:14:01 INFO  LocalNodeFirstLoadBalancingPolicy:59 - Adding host <MY_PUBLIC_IP> (datacenter1)
2015-02-17 19:14:01 DEBUG Schema:55 - Retrieving database schema from cluster Test Cluster...
2015-02-17 19:14:01 DEBUG Schema:55 - 1 keyspaces fetched from cluster Test Cluster: {vehicles}
2015-02-17 19:14:02 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <MY_PUBLIC_IP>:9160
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-3, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 DEBUG Cluster:1340 - Shutting down
2015-02-17 19:14:02 DEBUG Connection:428 - Connection[/<MY_PUBLIC_IP>:9042-2, inFlight=0, closed=true] closing connection
2015-02-17 19:14:02 INFO  CassandraConnector:51 - Disconnected from Cassandra cluster: Test Cluster
2015-02-17 19:14:03 DEBUG CassandraConnector:55 - Attempting to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.491517 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$c]
2015-02-17 19:14:10 DEBUG HeartbeatReceiver:56 - [actor] handled message (69.725123 ms) Heartbeat(localhost,[Lscala.Tuple2;@77008370,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$b]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:50 - [actor] received message Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:50 - [actor] received message BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.348586 ms) BlockManagerHeartbeat(BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$e]
2015-02-17 19:14:20 DEBUG HeartbeatReceiver:56 - [actor] handled message (2.020429 ms) Heartbeat(localhost,[Lscala.Tuple2;@70a7cd6e,BlockManagerId(<driver>, Alon-PC, 62343, 0)) from Actor[akka://sparkDriver/temp/$d]
2015-02-17 19:14:24 ERROR ServerSideTokenRangeSplitter:88 - Failure while fetching splits from Cassandra
java.io.IOException: Failed to open thrift connection to Cassandra at <AWS_LOCAL_IP>:9160
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:132)
    at com.datastax.spark.connector.cql.CassandraConnector.withCassandraClientDo(CassandraConnector.scala:141)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.com$datastax$spark$connector$rdd$partitioner$ServerSideTokenRangeSplitter$$fetchSplits(ServerSideTokenRangeSplitter.scala:33)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$$anonfun$apply.apply(ServerSideTokenRangeSplitter.scala:45)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$$anonfun$apply.apply(ServerSideTokenRangeSplitter.scala:45)
    at scala.util.Try$.apply(Try.scala:161)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun.apply(ServerSideTokenRangeSplitter.scala:45)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun.apply(ServerSideTokenRangeSplitter.scala:44)
    at scala.collection.immutable.Stream.map(Stream.scala:376)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:44)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf.apply(CassandraRDDPartitioner.scala:77)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf.apply(CassandraRDDPartitioner.scala:76)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect
    at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(TFramedTransportFactory.java:41)
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:47)
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:127)
    ... 41 more
Caused by: java.net.ConnectException: Connection timed out: connect
    at java.net.DualStackPlainSocketImpl.connect0(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:180)
    ... 45 more
Exception in thread "main" java.io.IOException: Failed to fetch splits of TokenRange(0,0,Set(CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)),None) from all endpoints: CassandraNode(/<AWS_LOCAL_IP>,/<MY_PUBLIC_IP>)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split.apply(ServerSideTokenRangeSplitter.scala:55)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter$$anonfun$split.apply(ServerSideTokenRangeSplitter.scala:49)
    at scala.Option.getOrElse(Option.scala:120)
    at com.datastax.spark.connector.rdd.partitioner.ServerSideTokenRangeSplitter.split(ServerSideTokenRangeSplitter.scala:49)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf.apply(CassandraRDDPartitioner.scala:77)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$$anonfun$com$datastax$spark$connector$rdd$partitioner$CassandraRDDPartitioner$$splitsOf.apply(CassandraRDDPartitioner.scala:76)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.flatmap2combiner(ParArray.scala:418)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1075)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1071)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:492)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:64)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:961)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:956)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-02-17 19:14:24 DEBUG DiskBlockManager:63 - Shutdown hook called

一开始它看起来不错(连接成功,获取密钥空间...)但是后来,当它尝试打开 thrift 连接时,它失败了,断开连接并关闭。

我已经打开了端口 9160、9042 和 7000。

然后在 cassandra.yaml 我设置

listen_address: <AWS_LOCAL_IP>
broadcast_address: <MY_PUBLIC_IP>

我错过了什么?

好的,我正要 post 这个问题,但后来我终于解决了:

在cassandra.yaml中,我不得不设置

rpc_adress = 0.0.0.0

其他 Whosebug 问题对我有帮助,但我 post 解决这个问题是因为堆栈跟踪可以帮助其他人找到它。