为什么sc.cassandraTable("test", "users").select("username")的map函数不能用?
why the map function of sc.cassandraTable("test", "users").select("username") can not work?
在 spark-cassandra-connector's demo and Installing the Cassandra / Spark OSS Stack 之后,在 spark-shell 下,我尝试了以下代码片段:
sc.stop
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "172.21.0.131")
.set("spark.cassandra.auth.username", "adminxx")
.set("spark.cassandra.auth.password", "adminxx")
val sc = new SparkContext("172.21.0.131", "Cassandra Connector Test", conf)
val rdd = sc.cassandraTable("test", "users").select("username")
rdd
的很多运算符都可以正常工作,例如:
rdd.first
rdd.count
但是当我使用 map
:
val result = rdd.map(x => 1) //just for simple
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at map at <console>:32
那么,我运行:
result.first
我收到以下错误:
15/12/11 15:09:00 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 104, 124.250.36.124): java.lang.ClassNotFoundException:
$line346.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
Caused by: java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
不知道为什么会出现这样的错误?任何建议将不胜感激!
更新:
根据 @RussSpitzer's answer for CassandraRdd.map( row => row.getInt("id)) does not work , java.lang.ClassNotFoundException happened!,我通过以下错误解决了这个错误,而不是使用 sc.stop
并创建一个新的 SparkContext
,我开始 spark-shell
选项:
bin/spark-shell -conf spark.cassandra.connection.host=172.21.0.131 --conf spark.cassandra.auth.username=adminxx --conf spark.cassandra.auth.password=adminxx
然后所有步骤都一样,工作正常。
Spark 应用程序通常将编译后的代码作为 jar 文件发送给执行程序。这样,您 map
的功能就会出现在执行程序上。
spark-shell
的情况更加棘手。它必须以交互方式为您的每一行编译和广播代码。甚至没有你在里面操作的class。它创建这些假的 $$iwC$$
classes 来解决这个问题。
通常这很有效,但您可能遇到了 spark-shell
错误。您可以尝试通过将代码放在 spark-shell
:
中的 class 中来解决它
object Obj { val mapper = { x: String => 1 } }
val result = rdd.map(Obj.mapper)
但最安全的做法可能是将您的代码作为一个应用程序来实现,而不是仅仅将其写入 spark-shell
。
Russell Spitzer's answer from the spark-connector-user
list:
I'm pretty sure the main problem here is that you start a context with --jars
and then kill that context and then start another one. Try simplifying your code, instead of setting all of those spark conf options and creating a new contexts run your shell like. Also the jar that you want on the classpath is the connector assembly jar, not a custom build of a Scala script you want to run.
./spark-shell --conf spark.casandra.connection.host=10.129.20.80 ...
You should not need to modify the ack.wait.timeout
or the executor.extraClasspath
.
在 spark-cassandra-connector's demo and Installing the Cassandra / Spark OSS Stack 之后,在 spark-shell 下,我尝试了以下代码片段:
sc.stop
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "172.21.0.131")
.set("spark.cassandra.auth.username", "adminxx")
.set("spark.cassandra.auth.password", "adminxx")
val sc = new SparkContext("172.21.0.131", "Cassandra Connector Test", conf)
val rdd = sc.cassandraTable("test", "users").select("username")
rdd
的很多运算符都可以正常工作,例如:
rdd.first
rdd.count
但是当我使用 map
:
val result = rdd.map(x => 1) //just for simple
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at map at <console>:32
那么,我运行:
result.first
我收到以下错误:
15/12/11 15:09:00 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 104, 124.250.36.124): java.lang.ClassNotFoundException:
$line346.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
Caused by: java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
不知道为什么会出现这样的错误?任何建议将不胜感激!
更新:
根据 @RussSpitzer's answer for CassandraRdd.map( row => row.getInt("id)) does not work , java.lang.ClassNotFoundException happened!,我通过以下错误解决了这个错误,而不是使用 sc.stop
并创建一个新的 SparkContext
,我开始 spark-shell
选项:
bin/spark-shell -conf spark.cassandra.connection.host=172.21.0.131 --conf spark.cassandra.auth.username=adminxx --conf spark.cassandra.auth.password=adminxx
然后所有步骤都一样,工作正常。
Spark 应用程序通常将编译后的代码作为 jar 文件发送给执行程序。这样,您 map
的功能就会出现在执行程序上。
spark-shell
的情况更加棘手。它必须以交互方式为您的每一行编译和广播代码。甚至没有你在里面操作的class。它创建这些假的 $$iwC$$
classes 来解决这个问题。
通常这很有效,但您可能遇到了 spark-shell
错误。您可以尝试通过将代码放在 spark-shell
:
object Obj { val mapper = { x: String => 1 } }
val result = rdd.map(Obj.mapper)
但最安全的做法可能是将您的代码作为一个应用程序来实现,而不是仅仅将其写入 spark-shell
。
Russell Spitzer's answer from the spark-connector-user
list:
I'm pretty sure the main problem here is that you start a context with
--jars
and then kill that context and then start another one. Try simplifying your code, instead of setting all of those spark conf options and creating a new contexts run your shell like. Also the jar that you want on the classpath is the connector assembly jar, not a custom build of a Scala script you want to run.
./spark-shell --conf spark.casandra.connection.host=10.129.20.80 ...
You should not need to modify the
ack.wait.timeout
or theexecutor.extraClasspath
.