使用 spark 和 RDD 映射 cassandra 数据库的 table
Map a table of a cassandra database using spark and RDD
我必须映射一个 table,其中写有应用程序的使用历史记录。 table 有这些元组:
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
AppId
总是不一样的,因为在很多app中引用,date
是用这种格式表示的 dd/mm/yyyy hh/mm
cpuUsage
和memoryUsage
是用这种格式表示的%
例如:
<3ghffh3t482age20304,230720142245,0.2,3,5>
我以这种方式从 cassandra 检索数据(小片段):
public static void main(String[] args) {
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
session = cluster.connect();
session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication "
+ "= {'class':'SimpleStrategy', 'replication_factor':3};");
String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage"
+ "(appid text,date text, cpuusage double, memoryusage double, "
+ "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);";
session.execute(createTableAppUsage);
// Use select to get the appusage's table rows
ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage");
for (Row row: resultForAppUsage)
System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage"));
// Clean up the connection by closing it
cluster.close();
}
所以,我现在的问题是通过 key value
映射数据并创建一个集成此代码的元组(代码段不起作用):
<AppId,cpuusage>
JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2(x, y);
}
如何使用 RDD 和 reduce eg. cpuusage >50
?
映射 appId 和 cpuusage
有什么帮助吗?
提前致谢。
我必须映射一个 table,其中写有应用程序的使用历史记录。 table 有这些元组:
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
AppId
总是不一样的,因为在很多app中引用,date
是用这种格式表示的 dd/mm/yyyy hh/mm
cpuUsage
和memoryUsage
是用这种格式表示的%
例如:
<3ghffh3t482age20304,230720142245,0.2,3,5>
我以这种方式从 cassandra 检索数据(小片段):
public static void main(String[] args) {
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
session = cluster.connect();
session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication "
+ "= {'class':'SimpleStrategy', 'replication_factor':3};");
String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage"
+ "(appid text,date text, cpuusage double, memoryusage double, "
+ "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);";
session.execute(createTableAppUsage);
// Use select to get the appusage's table rows
ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage");
for (Row row: resultForAppUsage)
System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage"));
// Clean up the connection by closing it
cluster.close();
}
所以,我现在的问题是通过 key value
映射数据并创建一个集成此代码的元组(代码段不起作用):
<AppId,cpuusage>
JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2(x, y);
}
如何使用 RDD 和 reduce eg. cpuusage >50
?
有什么帮助吗?
提前致谢。