使用 spark 和 java 在 cassandra 中保存对象
Save object in cassandra using spark and java
刚开始在 Java 中使用 spark 和 cassandra,我已经无法将数据保存到我的 cassandra 数据库。这是我的 java bean class
public class User implements Serializable {
public User(){}
public User(String username, String password){
this.username = username;
setPassword(password);
}
public User(String username, String password, boolean admin){
this.username = username;
this.admin = admin;
setPassword(password);
}
private String username;
public String getUserName(){ return username; }
public void setUsername(String username){ this.username = username; }
private String password;
public String getPassword(){ return password; }
public void setPassword(String password){ this.password = password; }
private Boolean admin = false;
public boolean isAdmin(){ return admin; }
public void setAdmin(boolean admin){ this.admin = admin; }
private Calendar dateRegistered = Calendar.getInstance();
public Calendar getDateRegistered(){ return dateRegistered; }
}
我与我的 cassandra 数据库建立了连接,并尝试按照以下方式保存数据
JavaRDD<User> usersRDD = sparkContext.parallelize(users);
javaFunctions(usersRDD).writerBuilder("database", "users", mapToRow(User.class)).saveToCassandra();
其中 users 是已启动用户的列表。当我执行此操作时,出现以下错误。
java.lang.IllegalArgumentException: requirement failed: Columns not found in class com.app.models.User: [username]
at scala.Predef$.require(Predef.scala:233)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.columnMapForWriting(ReflectionColumnMapper.scala:91)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon.<init>(MappedToGettableDataConverter.scala:27)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:18)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:269)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:59)
at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:443)
at com.autobot.context.SparkContext.createUsers(SparkContext.java:56)
at com.autobot.context.SparkContext.createUser(SparkContext.java:51)
at com.autobot.user.UserTest.saveUser(UserTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=13=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
也许这个异常是由于用户名的大小写不一致而导致的 getters/setters?我认为应该是:
private String userName;
public String getUserName(){ return username; }
public void setUserName(String username){ this.username = username; }
我知道上面的答案是正确的,但我想添加额外的解释。下面的文章将有助于理解 Table/Class
中 Cassandra 和 Scala/Java 属性之间映射的作用
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
The column-property naming convention is:
Cassandra column name Scala/Java property name
count count
column_1 column1
user_name userName
刚开始在 Java 中使用 spark 和 cassandra,我已经无法将数据保存到我的 cassandra 数据库。这是我的 java bean class
public class User implements Serializable {
public User(){}
public User(String username, String password){
this.username = username;
setPassword(password);
}
public User(String username, String password, boolean admin){
this.username = username;
this.admin = admin;
setPassword(password);
}
private String username;
public String getUserName(){ return username; }
public void setUsername(String username){ this.username = username; }
private String password;
public String getPassword(){ return password; }
public void setPassword(String password){ this.password = password; }
private Boolean admin = false;
public boolean isAdmin(){ return admin; }
public void setAdmin(boolean admin){ this.admin = admin; }
private Calendar dateRegistered = Calendar.getInstance();
public Calendar getDateRegistered(){ return dateRegistered; }
}
我与我的 cassandra 数据库建立了连接,并尝试按照以下方式保存数据
JavaRDD<User> usersRDD = sparkContext.parallelize(users);
javaFunctions(usersRDD).writerBuilder("database", "users", mapToRow(User.class)).saveToCassandra();
其中 users 是已启动用户的列表。当我执行此操作时,出现以下错误。
java.lang.IllegalArgumentException: requirement failed: Columns not found in class com.app.models.User: [username]
at scala.Predef$.require(Predef.scala:233)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.columnMapForWriting(ReflectionColumnMapper.scala:91)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon.<init>(MappedToGettableDataConverter.scala:27)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:18)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:269)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:59)
at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:443)
at com.autobot.context.SparkContext.createUsers(SparkContext.java:56)
at com.autobot.context.SparkContext.createUser(SparkContext.java:51)
at com.autobot.user.UserTest.saveUser(UserTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=13=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
也许这个异常是由于用户名的大小写不一致而导致的 getters/setters?我认为应该是:
private String userName;
public String getUserName(){ return username; }
public void setUserName(String username){ this.username = username; }
我知道上面的答案是正确的,但我想添加额外的解释。下面的文章将有助于理解 Table/Class
中 Cassandra 和 Scala/Java 属性之间映射的作用https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
The column-property naming convention is:
Cassandra column name Scala/Java property name
count count
column_1 column1
user_name userName