RDD joinWithCassandraTable

RDD joinWithCassandraTable

任何人都可以帮助我解决以下问题。 我有一个包含 5 列的 RDD。我想加入 Cassandra 中的 table。 我知道有一种方法可以使用 "joinWithCassandraTable"

我在某处看到了使用它的语法。 句法: RDD.joinWithCassandraTable(KEYSPACE, tablename, SomeColumns("cola","colb")) .on(SomeColumns("colc"))

谁能把正确的语法发给我?

我想真正知道在哪里提及 table 的列名,这是要加入的键。

JoinWithCassandraTable 的工作原理是仅从 C* 中提取与您的 RDD 条目匹配的分区键,因此它仅适用于分区键。

文档在这里 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable

并且 API 文档在这里

http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.0-M2/spark-cassandra-connector/#com.datastax.spark.connector.RDDFunctions

jWCT table方法可以在没有流利api的情况下使用,方法是在方法

中指定所有参数
def joinWithCassandraTable[R](
  keyspaceName: String, 
  tableName: String, 
  selectedColumns: ColumnSelector = AllColumns, 
  joinColumns: ColumnSelector = PartitionKeyColumns)

不过流利的api也可以

joinWithCassandraTable[R](keyspace, tableName).select(AllColumns).on(PartitionKeyColumns)

这两个调用是等价的

你的例子

RDD.joinWithCassandraTable(KEYSPACE, tablename, SomeColumns("cola","colb")) .on(SomeColumns("colc"))

使用来自 RDD 的对象与 tablenamecolc 连接,并且仅 returns colacolb 作为连接结果。

使用以下语法加入 cassandra

joinedData = rdd.joinWithCassandraTable(keyspace,table).on(partitionKeyName).select(Column Names)

看起来像这样,

joinedData = rdd.joinWithCassandraTable(keyspace,table).on('emp_id').select('emp_name', 'emp_city')