使用 apache flink 读写数据到 cassandra Java API
Read & write data into cassandra using apache flink Java API
我打算使用 apache flink read/write 使用 flink 将数据导入 cassandra。我希望使用 flink-connector-cassandra,但我找不到适合连接器的 documentation/examples。
能否请您指出使用 Apache Flink 从 cassandra 读取和写入数据的正确方法。我只看到纯粹用于写入的接收器示例? apache flink 是否也用于从类似于 apache spark 的 cassandra 读取数据?
您可以使用 RichFlatMapFunction
扩展 class
class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
var userCollection: MongoCollection[Document] = _
override def open(parameters: Configuration): Unit = {
// do something here like opening connection
val client: MongoClient = MongoClient("mongodb://localhost:10000")
userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
super.open(parameters)
}
override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {
// Do something here per record and this function can make use of objects initialized via open
userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
(result: Document) => {
// println(result)
},
(t: Throwable) =>{
println(t)
},
()=>{
out.collect(event)
}
)
}
}
}
基本上,open
函数对每个工作人员执行一次,flatmap
对每个记录执行一次。该示例适用于 mongo,但可以类似地用于 cassandra
据我了解,在您的情况下,您的管道的第一步是从 Cassandra 读取数据而不是编写 RichFlatMapFunction
您应该编写自己的 RichSourceFunction
作为参考,您可以查看 WikipediaEditsSource.
的简单实现
我有同样的问题,这就是我要找的。我不知道它是否针对您的需要进行了过度简化,但我想我应该展示它 none 更少。
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("urlToUse.com").withPort(9042).build();
}
};
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> testOutputTuple = new Tuple2<>();
cassandraInputFormat.nextRecord(testOutputTuple);
System.out.println("column1: " + testOutputTuple.f0);
System.out.println("column2: " + testOutputTuple.f1);
我解决这个问题的方法是找到 "CassandraInputFormat" class 的代码并查看它是如何工作的 (http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java)。老实说,我希望它只是一种格式,而不是根据名称阅读 Cassandra 的完整内容 class,我觉得其他人可能也在想同样的事情。
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));
DataStreamSource t = env.createInput(inputFormat, TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
tableEnv.registerDataStream("t1",t);
Table t2 = tableEnv.sql("select * from t1");
t2.printSchema();
我打算使用 apache flink read/write 使用 flink 将数据导入 cassandra。我希望使用 flink-connector-cassandra,但我找不到适合连接器的 documentation/examples。
能否请您指出使用 Apache Flink 从 cassandra 读取和写入数据的正确方法。我只看到纯粹用于写入的接收器示例? apache flink 是否也用于从类似于 apache spark 的 cassandra 读取数据?
您可以使用 RichFlatMapFunction
扩展 class
class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
var userCollection: MongoCollection[Document] = _
override def open(parameters: Configuration): Unit = {
// do something here like opening connection
val client: MongoClient = MongoClient("mongodb://localhost:10000")
userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
super.open(parameters)
}
override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {
// Do something here per record and this function can make use of objects initialized via open
userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
(result: Document) => {
// println(result)
},
(t: Throwable) =>{
println(t)
},
()=>{
out.collect(event)
}
)
}
}
}
基本上,open
函数对每个工作人员执行一次,flatmap
对每个记录执行一次。该示例适用于 mongo,但可以类似地用于 cassandra
据我了解,在您的情况下,您的管道的第一步是从 Cassandra 读取数据而不是编写 RichFlatMapFunction
您应该编写自己的 RichSourceFunction
作为参考,您可以查看 WikipediaEditsSource.
的简单实现我有同样的问题,这就是我要找的。我不知道它是否针对您的需要进行了过度简化,但我想我应该展示它 none 更少。
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("urlToUse.com").withPort(9042).build();
}
};
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> testOutputTuple = new Tuple2<>();
cassandraInputFormat.nextRecord(testOutputTuple);
System.out.println("column1: " + testOutputTuple.f0);
System.out.println("column2: " + testOutputTuple.f1);
我解决这个问题的方法是找到 "CassandraInputFormat" class 的代码并查看它是如何工作的 (http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java)。老实说,我希望它只是一种格式,而不是根据名称阅读 Cassandra 的完整内容 class,我觉得其他人可能也在想同样的事情。
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));
DataStreamSource t = env.createInput(inputFormat, TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
tableEnv.registerDataStream("t1",t);
Table t2 = tableEnv.sql("select * from t1");
t2.printSchema();