如何 运行 多重 Spark Cassandra 查询
How To Run Multiple Spark Cassandra Query
我需要运行下面这样的任务。不知何故,我错过了一点。我知道,我不能像这样使用 javasparkcontext 并传递 javafunctions,因为存在序列化问题。
我需要 运行 多个大小为 cartesian.size() 的 cassandra 查询。有什么建议吗?
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<DateTime> dateTimeJavaRDD = jsc.parallelize(dateTimes); //List<DateTime>
JavaRDD<Integer> virtualPartitionJavaRDD = jsc.parallelize(virtualPartitions); //List<Integer>
JavaPairRDD<DateTime, Integer> cartesian = dateTimeJavaRDD.cartesian(virtualPartitionJavaRDD);
long c = cartesian.map(new Function<Tuple2<DateTime, Integer>, Long>() {
@Override
public Long call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return javaFunctions(jsc).cassandraTable("keyspace", "table").where("p1 = ? and p2 = ?", tuple2._1(), tuple2._2()).count();
}
}).reduce((a,b) -> a + b);
System.out.println("TOTAL ROW COUNT IS: " + c);
正确的解决方案应该是在您的数据和 Casasndra table 之间执行连接。有joinWithCassandraTable function that is doing what you need - you just generate RDD of Tuple2
that contains values for p1
& p2
, and then call joinWithCassandra table, something like this (not tested, adopted from my example here):
JavaRDD<Tuple2<Integer, Integer>> trdd = cartesian.map(new Function<Tuple2<DateTime, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return new Tuple2<Integer, Integer>(tuple2._1(), tuple2._2());
}
});
CassandraJavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Integer, String>> joinedRDD =
trdd.joinWithCassandraTable("test", "jtest",
someColumns("p1", "p2"), someColumns("p1", "p2"),
mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));
// perform counting here...
我需要运行下面这样的任务。不知何故,我错过了一点。我知道,我不能像这样使用 javasparkcontext 并传递 javafunctions,因为存在序列化问题。
我需要 运行 多个大小为 cartesian.size() 的 cassandra 查询。有什么建议吗?
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<DateTime> dateTimeJavaRDD = jsc.parallelize(dateTimes); //List<DateTime>
JavaRDD<Integer> virtualPartitionJavaRDD = jsc.parallelize(virtualPartitions); //List<Integer>
JavaPairRDD<DateTime, Integer> cartesian = dateTimeJavaRDD.cartesian(virtualPartitionJavaRDD);
long c = cartesian.map(new Function<Tuple2<DateTime, Integer>, Long>() {
@Override
public Long call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return javaFunctions(jsc).cassandraTable("keyspace", "table").where("p1 = ? and p2 = ?", tuple2._1(), tuple2._2()).count();
}
}).reduce((a,b) -> a + b);
System.out.println("TOTAL ROW COUNT IS: " + c);
正确的解决方案应该是在您的数据和 Casasndra table 之间执行连接。有joinWithCassandraTable function that is doing what you need - you just generate RDD of Tuple2
that contains values for p1
& p2
, and then call joinWithCassandra table, something like this (not tested, adopted from my example here):
JavaRDD<Tuple2<Integer, Integer>> trdd = cartesian.map(new Function<Tuple2<DateTime, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return new Tuple2<Integer, Integer>(tuple2._1(), tuple2._2());
}
});
CassandraJavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Integer, String>> joinedRDD =
trdd.joinWithCassandraTable("test", "jtest",
someColumns("p1", "p2"), someColumns("p1", "p2"),
mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));
// perform counting here...