Cassandra 加入后解析 Spark RDD

Parse Spark RDD after Cassandra join

我想读取镶木地板文件并加入 Cassandra 中的一些列。我能够加入并获得一个 RDD,但我无法解析生成的 RDD。这里有更多详细信息

case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)

val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))


val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")

scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19

生成的 RDD 模式如上所示。 RDD的输出是这样的。

(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})

(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})

我不确定如何解析这个 RDD。我想总结 IP 的 最后一列 和 Cassandra 行的 value1 列。

如果需要更多详细信息,请告诉我。并感谢您的帮助

这是 Cassandra table 架构

CREATE TABLE aa.tbl (
key text,
key2 text,
key3 text,
key4 text,
key5 text,
key6 text,
key7 text,
key8 text,
key9 text,
key10 text,
key11 text,
key12 text,
key13 text,
key14 text,
key15 text,
column1 text,
column2 text,
column3 text,
column4 text,
column5 text,
value1 text,
value2 blob,
value3 text,
PRIMARY KEY ((key, key2, key3, key4, key5, key6, key7, key8, key9, key10, key11, key12, key13, key14, key15), column1, column2, column3, column4, column5)
) WITH CLUSTERING ORDER BY (column1 ASC, column2 ASC, column3 ASC, column4 ASC, column5 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';

你需要做这样的事情(没有检查代码,但它应该通过小的调整工作 - 我假设 value1 在 Cassandra 中有整数类型):

joinWithRDD.map { case (ip, row) => 
   val newVal = ip.value1.toInteger + row.getInt("value1")
   IP(ip.key, key2, .... newVal.toString)
}

joinWithCassandraTablereturns 将您的数据作为 _1 返回一个元组,在 Cassandra 中找到的数据作为 _2。访问 Cassandra 数据时,您可以使用 getter 函数 getIntgetString 等,或者您可以将 Row 映射到案例 class,如 documentation for Spark Cassandra Connector.

我能够完成这项工作。感谢 Alex

的投入
val b = joinWithRDD.map { case (ip, row) => IP(ip.key, ip.key2, ip.key3,ip.key4,ip.key5,ip.key6,ip.key7,ip.key8,ip.key9,ip.key10,ip.key11,ip.key12,ip.key13,ip.key14,ip.key15,ip.column1,ip.column2,ip.column3,ip.column4,ip.column5,ip.value1 + row.getString("value1")) }