如何将列表拆分为多个分区并发送给执行者
how to split a list to multiple partitions and sent to executors
当我们如下使用spark从csv中读取数据给DB时,它会自动将数据拆分到多个分区并发送给executors
spark
.read
.option("delimiter", ",")
.option("header", "true")
.option("mergeSchema", "true")
.option("codec", properties.getProperty("sparkCodeC"))
.format(properties.getProperty("fileFormat"))
.load(inputFile)
目前,我有一个 id 列表:
[1,2,3,4,5,6,7,8,9,...1000]
我想做的是将这个列表拆分成多个分区发送给executors,在每个executor中,运行 the sql as
ids.foreach(id => {
select * from table where id = id
})
当我们从 cassandra 加载数据时,连接器将生成查询 sql 为:
select columns from table where Token(k) >= ? and Token(k) <= ?
这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个 table,我只是从 table 获取所有数据,其中 k (分区键)在 id 列表中。
table 架构为:
CREATE TABLE IF NOT EXISTS tab.events (
k int,
o text,
event text
PRIMARY KEY (k,o)
);
或者我如何使用预定义的 sql 语句使用 spark 从 cassandra 加载数据而不扫描整个 table?
您只需要使用joinWithCassandra
function来执行仅选择您操作所需的数据。但请注意,此功能只能通过 RDD API.
使用
像这样:
val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")
您需要确保 DataFrame 中的列名称与 Cassandra 中的分区键名称相匹配 - 有关详细信息,请参阅文档。
DataFrame 实现仅在 DSE 版本的 Spark Cassandra Connector 中可用,如 following blog post 中所述。
2020 年 9 月更新:Spark Cassandra Connector 2.5.0
中添加了对 Cassandra 加入的支持
当我们如下使用spark从csv中读取数据给DB时,它会自动将数据拆分到多个分区并发送给executors
spark
.read
.option("delimiter", ",")
.option("header", "true")
.option("mergeSchema", "true")
.option("codec", properties.getProperty("sparkCodeC"))
.format(properties.getProperty("fileFormat"))
.load(inputFile)
目前,我有一个 id 列表:
[1,2,3,4,5,6,7,8,9,...1000]
我想做的是将这个列表拆分成多个分区发送给executors,在每个executor中,运行 the sql as
ids.foreach(id => {
select * from table where id = id
})
当我们从 cassandra 加载数据时,连接器将生成查询 sql 为:
select columns from table where Token(k) >= ? and Token(k) <= ?
这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个 table,我只是从 table 获取所有数据,其中 k (分区键)在 id 列表中。
table 架构为:
CREATE TABLE IF NOT EXISTS tab.events (
k int,
o text,
event text
PRIMARY KEY (k,o)
);
或者我如何使用预定义的 sql 语句使用 spark 从 cassandra 加载数据而不扫描整个 table?
您只需要使用joinWithCassandra
function来执行仅选择您操作所需的数据。但请注意,此功能只能通过 RDD API.
像这样:
val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")
您需要确保 DataFrame 中的列名称与 Cassandra 中的分区键名称相匹配 - 有关详细信息,请参阅文档。
DataFrame 实现仅在 DSE 版本的 Spark Cassandra Connector 中可用,如 following blog post 中所述。
2020 年 9 月更新:Spark Cassandra Connector 2.5.0
中添加了对 Cassandra 加入的支持