Cassandra 不使用 UDT
Cassandra is not working with UDT
我有一个 Java 应用程序,其中有 Spark-1.4.0
和 Cassandra-2.1.5
以及 Cassandra-Spark-connection-1.4.0-M1
。
在此应用程序中,我尝试使用 Dataframe
或使用 javaFunctions class
将 Java Bean Class 存储到 Cassandra table 中20=].
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
javaFunctions(arg0).writerBuilder(
Properties.getString("spark.cassandra.keyspace"),
Properties.getString("spark.cassandra.table"),
mapToRow(Message.class)).saveToCassandra();
或
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
SQLContext sqlContext = SparkConnection.getSqlContext();
DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
df.write()
.mode(SaveMode.Append)
.option("keyspace",Properties.getString("spark.cassandra.keyspace"))
.option("table",Properties.getString("spark.cassandra.table"))
.format("org.apache.spark.sql.cassandra").save();
但是我遇到了这个错误
15/06/16 19:51:38 INFO CassandraConnector: Disconnected from Cassandra cluster: BDI Cassandra
15/06/16 19:51:39 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4, 192.168.1.19): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to com.datastax.spark.connector.UDTValue.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:44)
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:40)
at com.datastax.spark.connector.types.UserDefinedType$$anon$$anonfun$convertPF.applyOrElse(UserDefinedType.scala:33)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40)
at com.datastax.spark.connector.types.UserDefinedType$$anon.convert(UserDefinedType.scala:31)
at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues.apply$mcVI$sp(SqlRowWriter.scala:21)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:20)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:8)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:35)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:135)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:119)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:102)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:101)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:130)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
以前,我可以使用映射器 class.
成功地将消息对象保存到 Cassandra table 中
MappingManager mapping=new MappingManager(session);
Mapper<Message> mapper=mapping.mapper(Message.class);
mapper.save(message);
这是我的Java Bean
import com.datastax.driver.mapping.annotations.FrozenKey;
import com.datastax.driver.mapping.annotations.Table;
@Table(name = "data")
public class Message implements Serializable{
private static final long serialVersionUID = 42L;
private String admin;
private String searchname;
private String searchsource;
private String searchtype;
private String messageid;
private String message;
@FrozenKey
private List<Action> actions;
@Frozen
private AdminCreator admincreator;
@Frozen
private AppReference appreference;
private String caption;
@Frozen
private Reference referencefrom;
private String icon;
private Boolean ishidden;
.....
.....
.....
1.4.0 版本中不存在此功能。我有一个补丁
https://github.com/datastax/spark-cassandra-connector/pull/856
这应该在未来的版本中得到修复。
有空请在https://datastax-oss.atlassian.net/browse/SPARKC-271上测试和评论
我有一个 Java 应用程序,其中有 Spark-1.4.0
和 Cassandra-2.1.5
以及 Cassandra-Spark-connection-1.4.0-M1
。
在此应用程序中,我尝试使用 Dataframe
或使用 javaFunctions class
将 Java Bean Class 存储到 Cassandra table 中20=].
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
javaFunctions(arg0).writerBuilder(
Properties.getString("spark.cassandra.keyspace"),
Properties.getString("spark.cassandra.table"),
mapToRow(Message.class)).saveToCassandra();
或
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
SQLContext sqlContext = SparkConnection.getSqlContext();
DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
df.write()
.mode(SaveMode.Append)
.option("keyspace",Properties.getString("spark.cassandra.keyspace"))
.option("table",Properties.getString("spark.cassandra.table"))
.format("org.apache.spark.sql.cassandra").save();
但是我遇到了这个错误
15/06/16 19:51:38 INFO CassandraConnector: Disconnected from Cassandra cluster: BDI Cassandra
15/06/16 19:51:39 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4, 192.168.1.19): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to com.datastax.spark.connector.UDTValue.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:44)
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:40)
at com.datastax.spark.connector.types.UserDefinedType$$anon$$anonfun$convertPF.applyOrElse(UserDefinedType.scala:33)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40)
at com.datastax.spark.connector.types.UserDefinedType$$anon.convert(UserDefinedType.scala:31)
at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues.apply$mcVI$sp(SqlRowWriter.scala:21)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:20)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:8)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:35)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:135)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:119)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:102)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:101)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:130)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
以前,我可以使用映射器 class.
成功地将消息对象保存到 Cassandra table 中MappingManager mapping=new MappingManager(session);
Mapper<Message> mapper=mapping.mapper(Message.class);
mapper.save(message);
这是我的Java Bean
import com.datastax.driver.mapping.annotations.FrozenKey;
import com.datastax.driver.mapping.annotations.Table;
@Table(name = "data")
public class Message implements Serializable{
private static final long serialVersionUID = 42L;
private String admin;
private String searchname;
private String searchsource;
private String searchtype;
private String messageid;
private String message;
@FrozenKey
private List<Action> actions;
@Frozen
private AdminCreator admincreator;
@Frozen
private AppReference appreference;
private String caption;
@Frozen
private Reference referencefrom;
private String icon;
private Boolean ishidden;
.....
.....
.....
1.4.0 版本中不存在此功能。我有一个补丁 https://github.com/datastax/spark-cassandra-connector/pull/856 这应该在未来的版本中得到修复。
有空请在https://datastax-oss.atlassian.net/browse/SPARKC-271上测试和评论