如何将列表拆分为多个分区并发送给执行者

how to split a list to multiple partitions and sent to executors

当我们如下使用spark从csv中读取数据给DB时,它会自动将数据拆分到多个分区并发送给executors

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

目前,我有一个 id 列表:

[1,2,3,4,5,6,7,8,9,...1000]

我想做的是将这个列表拆分成多个分区发送给executors,在每个executor中,运行 the sql as

ids.foreach(id => {    
select * from table where id = id
})

当我们从 cassandra 加载数据时,连接器将生成查询 sql 为:

select columns from table where Token(k) >= ? and Token(k) <= ? 

这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个 table,我只是从 table 获取所有数据,其中 k (分区键)在 id 列表中。

table 架构为:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

或者我如何使用预定义的 sql 语句使用 spark 从 cassandra 加载数据而不扫描整个 table?

您只需要使用joinWithCassandra function来执行仅选择您操作所需的数据。但请注意,此功能只能通过 RDD API.

使用

像这样:

val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")

您需要确保 DataFrame 中的列名称与 Cassandra 中的分区键名称相匹配 - 有关详细信息,请参阅文档。

DataFrame 实现仅在 DSE 版本的 Spark Cassandra Connector 中可用,如 following blog post 中所述。

2020 年 9 月更新:Spark Cassandra Connector 2.5.0

中添加了对 Cassandra 加入的支持