saveToCassandra 与 Cassandra Lucene 插件一起使用?
saveToCassandra works with Cassandra Lucene plugin?
我在 Cassandra 页面 (https://github.com/Stratio/cassandra-lucene-index) 的 Lucene 插件上实施示例,当我尝试使用 saveToCassandra 保存数据时,出现异常 NoSuchElementException。
如果我使用 CassandraConnector.withSessionDo 我可以将元素添加到 Cassandra 中并且不会引发异常。
表格:
CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1};
USE demo;
CREATE TABLE tweets (
id INT PRIMARY KEY,
user TEXT,
body TEXT,
time TIMESTAMP,
latitude FLOAT,
longitude FLOAT
);
CREATE CUSTOM INDEX tweets_index ON tweets ()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
'refresh_seconds' : '1',
'schema' : '{
fields : {
id : {type : "integer"},
user : {type : "string"},
body : {type : "text", analyzer : "english"},
time : {type : "date", pattern : "yyyy/MM/dd", sorted : true},
place : {type : "geo_point", latitude:"latitude", longitude:"longitude"}
}
}'
};
代码:
import org.apache.spark.{SparkConf, SparkContext, Logging}
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
object App extends Logging{
def main(args: Array[String]) {
// Get the cassandra IP and create the spark context
val cassandraIP = System.getenv("CASSANDRA_IP");
val sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraIP)
.set("spark.cleaner.ttl", "3600")
.setAppName("Simple Spark Cassandra Example")
val sc = new SparkContext(sparkConf)
// Works
CassandraConnector(sparkConf).withSessionDo { session =>
session.execute("INSERT INTO demo.tweets(id, user, body, time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19 09:00:00-0300', 39, 39)")
}
// Does not work
val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29 19:00:00-0300", 29, 29)))
// Raises the exception
demo.saveToCassandra("demo", "tweets", SomeColumns("id", "user", "body", "time", "latitude", "longitude"))
}
}
异常:
16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at scala.collection.Map$WithDefault.default(Map.scala:52)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:153)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:152)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:283)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:271)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables(Schema.scala:271)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:295)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:294)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces(Schema.scala:294)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:307)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:304)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
at com.webradar.spci.spark.cassandra.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at scala.collection.Map$WithDefault.default(Map.scala:52)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:153)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:152)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:283)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:271)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables(Schema.scala:271)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:295)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:294)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces(Schema.scala:294)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:307)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:304)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
at com.webradar.spci.spark.cassandra.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
已编辑:
版本
- Spark 1.6.0
- 卡桑德拉 3.0.3
- Lucene 插件 3.0.3.1
- 为了创建 Jar,我使用了 maven-assembly-plugin 来获得一个胖 JAR。
如果删除自定义索引,我就可以使用 saveToCassandra
看来问题是Cassandra Spark驱动的问题,而不是插件的问题。
因为 CASSANDRA-10217 Cassandra 3.x 不再需要在假列上创建每行索引。因此,从 Cassandra 3.x 开始,“CREATE CUSTOM INDEX %s ON %s(%s)” 基于列的语法被替换为新的“CREATE CUSTOM INDEX %s ON %s()" 基于行的语法。但是,DataStax Spark 驱动程序似乎还不支持这个新功能。
当调用“com.datastax.spark.connector.RDDFunctions.saveToCassandra”时,它会尝试加载 table 架构和与 table 列相关的索引架构。由于这个新的索引语法不再有假列,它会由于列名为空而导致 NoSuchElementException。
但是,如果您使用先前的假列语法执行相同的示例,saveToCassandra 会运行良好:
CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1};
USE demo;
CREATE TABLE tweets (
id INT PRIMARY KEY,
user TEXT,
body TEXT,
time TIMESTAMP,
latitude FLOAT,
longitude FLOAT,
lucene TEXT
);
CREATE CUSTOM INDEX tweets_index ON tweets (lucene)
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
'refresh_seconds' : '1',
'schema' : '{
fields : {
id : {type : "integer"},
user : {type : "string"},
body : {type : "text", analyzer : "english"},
time : {type : "date", pattern : "yyyy/MM/dd", sorted : true},
place : {type : "geo_point", latitude:"latitude", longitude:"longitude"}
}
}'
};
我在 Cassandra 页面 (https://github.com/Stratio/cassandra-lucene-index) 的 Lucene 插件上实施示例,当我尝试使用 saveToCassandra 保存数据时,出现异常 NoSuchElementException。 如果我使用 CassandraConnector.withSessionDo 我可以将元素添加到 Cassandra 中并且不会引发异常。
表格:
CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1};
USE demo;
CREATE TABLE tweets (
id INT PRIMARY KEY,
user TEXT,
body TEXT,
time TIMESTAMP,
latitude FLOAT,
longitude FLOAT
);
CREATE CUSTOM INDEX tweets_index ON tweets ()
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
'refresh_seconds' : '1',
'schema' : '{
fields : {
id : {type : "integer"},
user : {type : "string"},
body : {type : "text", analyzer : "english"},
time : {type : "date", pattern : "yyyy/MM/dd", sorted : true},
place : {type : "geo_point", latitude:"latitude", longitude:"longitude"}
}
}'
};
代码:
import org.apache.spark.{SparkConf, SparkContext, Logging}
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
object App extends Logging{
def main(args: Array[String]) {
// Get the cassandra IP and create the spark context
val cassandraIP = System.getenv("CASSANDRA_IP");
val sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraIP)
.set("spark.cleaner.ttl", "3600")
.setAppName("Simple Spark Cassandra Example")
val sc = new SparkContext(sparkConf)
// Works
CassandraConnector(sparkConf).withSessionDo { session =>
session.execute("INSERT INTO demo.tweets(id, user, body, time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19 09:00:00-0300', 39, 39)")
}
// Does not work
val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29 19:00:00-0300", 29, 29)))
// Raises the exception
demo.saveToCassandra("demo", "tweets", SomeColumns("id", "user", "body", "time", "latitude", "longitude"))
}
}
异常:
16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at scala.collection.Map$WithDefault.default(Map.scala:52)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:153)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:152)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:283)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:271)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables(Schema.scala:271)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:295)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:294)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces(Schema.scala:294)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:307)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:304)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
at com.webradar.spci.spark.cassandra.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName.apply(Schema.scala:60)
at scala.collection.Map$WithDefault.default(Map.scala:52)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:153)
at com.datastax.spark.connector.cql.TableDef$$anonfun.apply(Schema.scala:152)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:283)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables.apply(Schema.scala:271)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables(Schema.scala:271)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:295)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces.apply(Schema.scala:294)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces(Schema.scala:294)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:307)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra.apply(Schema.scala:304)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo.apply(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
at com.webradar.spci.spark.cassandra.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
已编辑: 版本
- Spark 1.6.0
- 卡桑德拉 3.0.3
- Lucene 插件 3.0.3.1
- 为了创建 Jar,我使用了 maven-assembly-plugin 来获得一个胖 JAR。
如果删除自定义索引,我就可以使用 saveToCassandra
看来问题是Cassandra Spark驱动的问题,而不是插件的问题。
因为 CASSANDRA-10217 Cassandra 3.x 不再需要在假列上创建每行索引。因此,从 Cassandra 3.x 开始,“CREATE CUSTOM INDEX %s ON %s(%s)” 基于列的语法被替换为新的“CREATE CUSTOM INDEX %s ON %s()" 基于行的语法。但是,DataStax Spark 驱动程序似乎还不支持这个新功能。
当调用“com.datastax.spark.connector.RDDFunctions.saveToCassandra”时,它会尝试加载 table 架构和与 table 列相关的索引架构。由于这个新的索引语法不再有假列,它会由于列名为空而导致 NoSuchElementException。
但是,如果您使用先前的假列语法执行相同的示例,saveToCassandra 会运行良好:
CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1};
USE demo;
CREATE TABLE tweets (
id INT PRIMARY KEY,
user TEXT,
body TEXT,
time TIMESTAMP,
latitude FLOAT,
longitude FLOAT,
lucene TEXT
);
CREATE CUSTOM INDEX tweets_index ON tweets (lucene)
USING 'com.stratio.cassandra.lucene.Index'
WITH OPTIONS = {
'refresh_seconds' : '1',
'schema' : '{
fields : {
id : {type : "integer"},
user : {type : "string"},
body : {type : "text", analyzer : "english"},
time : {type : "date", pattern : "yyyy/MM/dd", sorted : true},
place : {type : "geo_point", latitude:"latitude", longitude:"longitude"}
}
}'
};