将 columnNames 动态传递给 cassandraTable().select()

Pass columnNames dynamically to cassandraTable().select()

我正在 运行 时从文件中读取查询并在 SPark+Cassandra 环境中执行它。

我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

文件中的查询:

select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true

此处的 Col1、col2、col3 可能因从文件中解析的查询而异。

问题:
如何从查询中选择 columnName 并将它们传递给 select() 和 运行time。

我试过很多方法了:
1. 做过最愚蠢的事情(这显然引发了错误)-

var str = "col1,col2,col3"
var selectStmt = str.split("\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)

欢迎任何想法。

旁注:
1. 我不想使用 cassandraCntext 因为它将在下一个版本中被删除/删除 (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. 我在
- 一种。斯卡拉 2.11
- b。 spark-cassandra-connector_2.11:1.6.0-M1
- C。火花 1.6

使用 Cassandra 连接器

您的用例听起来您确实想要使用 CassandraConnector 对象。这些使您可以直接访问每个 ExecutorJVM 会话池,非常适合仅执行随机查询。这最终将比为每个查询创建一个 RDD 更有效。

这看起来像

rddOfStatements.mapPartitions( it => 
  CassandraConnector.withSessionDo { session => 
    it.map(statement => 
      session.execute(statement))})

但您很可能希望使用 executeAsync 并单独处理期货以获得更好的性能。

以编程方式指定 cassandraTable 中的列

select 方法需要 ColumnRef*,这意味着您需要传入一定数量的 ColumnRef。通常有一个从 String --> ColumnRef 的隐式转换,这就是为什么你可以只传入字符串的可变参数。

这里有点复杂,因为我们想传递另一种类型的 var args,所以我们最终得到了双重隐含,而 Scala 不喜欢那样。

所以我们将 ColumnName 对象作为可变参数 (:_*)

========================================
 Keyspace: test
========================================
 Table: dummy
----------------------------------------
 - id                      : java.util.UUID                                                                   (partition key column)
 - txt                     : String


val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)

//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)):_*)
  .collect      

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})

//Only use the first column
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).take(1):_*)
  .collect

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})

//Only use the last column        
sc.cassandraTable("test","dummy")
  .select(columns.map(ColumnName(_)).takeRight(1):_*)
  .collect

Array(CassandraRow{txt: hello world})