将 columnNames 动态传递给 cassandraTable().select()
Pass columnNames dynamically to cassandraTable().select()
我正在 运行 时从文件中读取查询并在 SPark+Cassandra 环境中执行它。
我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")
文件中的查询:
select col1, col2, col3
from keyspaceName.colFamilyName
where somecondition = true
此处的 Col1、col2、col3 可能因从文件中解析的查询而异。
问题:
如何从查询中选择 columnName 并将它们传递给 select() 和 运行time。
我试过很多方法了:
1. 做过最愚蠢的事情(这显然引发了错误)-
var str = "col1,col2,col3"
var selectStmt = str.split("\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)
欢迎任何想法。
旁注:
1. 我不想使用 cassandraCntext 因为它将在下一个版本中被删除/删除 (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. 我在
- 一种。斯卡拉 2.11
- b。 spark-cassandra-connector_2.11:1.6.0-M1
- C。火花 1.6
使用 Cassandra 连接器
您的用例听起来您确实想要使用 CassandraConnector
对象。这些使您可以直接访问每个 ExecutorJVM 会话池,非常适合仅执行随机查询。这最终将比为每个查询创建一个 RDD 更有效。
这看起来像
rddOfStatements.mapPartitions( it =>
CassandraConnector.withSessionDo { session =>
it.map(statement =>
session.execute(statement))})
但您很可能希望使用 executeAsync
并单独处理期货以获得更好的性能。
以编程方式指定 cassandraTable 中的列
select
方法需要 ColumnRef*
,这意味着您需要传入一定数量的 ColumnRef
。通常有一个从 String
--> ColumnRef
的隐式转换,这就是为什么你可以只传入字符串的可变参数。
这里有点复杂,因为我们想传递另一种类型的 var args,所以我们最终得到了双重隐含,而 Scala 不喜欢那样。
所以我们将 ColumnName
对象作为可变参数 (:_*)
========================================
Keyspace: test
========================================
Table: dummy
----------------------------------------
- id : java.util.UUID (partition key column)
- txt : String
val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)
//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})
//Only use the first column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).take(1):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})
//Only use the last column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).takeRight(1):_*)
.collect
Array(CassandraRow{txt: hello world})
我正在 运行 时从文件中读取查询并在 SPark+Cassandra 环境中执行它。
我正在执行:
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")
文件中的查询:
select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true
此处的 Col1、col2、col3 可能因从文件中解析的查询而异。
问题:
如何从查询中选择 columnName 并将它们传递给 select() 和 运行time。
我试过很多方法了:
1. 做过最愚蠢的事情(这显然引发了错误)-
var str = "col1,col2,col3"
var selectStmt = str.split("\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)
欢迎任何想法。
旁注:
1. 我不想使用 cassandraCntext 因为它将在下一个版本中被删除/删除 (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. 我在
- 一种。斯卡拉 2.11
- b。 spark-cassandra-connector_2.11:1.6.0-M1
- C。火花 1.6
使用 Cassandra 连接器
您的用例听起来您确实想要使用 CassandraConnector
对象。这些使您可以直接访问每个 ExecutorJVM 会话池,非常适合仅执行随机查询。这最终将比为每个查询创建一个 RDD 更有效。
这看起来像
rddOfStatements.mapPartitions( it =>
CassandraConnector.withSessionDo { session =>
it.map(statement =>
session.execute(statement))})
但您很可能希望使用 executeAsync
并单独处理期货以获得更好的性能。
以编程方式指定 cassandraTable 中的列
select
方法需要 ColumnRef*
,这意味着您需要传入一定数量的 ColumnRef
。通常有一个从 String
--> ColumnRef
的隐式转换,这就是为什么你可以只传入字符串的可变参数。
这里有点复杂,因为我们想传递另一种类型的 var args,所以我们最终得到了双重隐含,而 Scala 不喜欢那样。
所以我们将 ColumnName
对象作为可变参数 (:_*)
========================================
Keyspace: test
========================================
Table: dummy
----------------------------------------
- id : java.util.UUID (partition key column)
- txt : String
val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)
//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})
//Only use the first column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).take(1):_*)
.collect
Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})
//Only use the last column
sc.cassandraTable("test","dummy")
.select(columns.map(ColumnName(_)).takeRight(1):_*)
.collect
Array(CassandraRow{txt: hello world})