通过 sparklyr 将 cassandra table 导入到 spark - 可能 select 只有一些列?

Importing cassandra table into spark via sparklyr - possible to select only some columns?

我一直在与 sparklyr 合作,将大型 cassandra 表引入 spark,用 R 注册这些表并对其进行 dplyr 操作。

我已经使用如下代码成功导入了 cassandra 表:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间 import/register,有些会导致内存溢出,即使有 6 个节点 运行 总共 60内核和 192GB 内存。但是,我通常只需要每个 cassandra 数据库中的一些列。

我的问题是:

  1. 是否可以在 import/registration 上过滤 cassandra 数据库,以便它只导入一些列或在主键上过滤(即通过传递 SQL / CQL 输入查询,例如 SELECT name FROM cass_table WHERE id = 5)?
  2. 在上面的代码中,这样的查询会去哪里,语法采用什么形式?

我尝试在选项列表中添加这样的查询作为附加选项,即:

list(. . . , select = "id")

以及在 %>% invoke("load") 之前将其作为单独的管道调用,即:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

但是这些都不行。有什么建议吗?

您可以跳过急切缓存和 select 个感兴趣的列:

session <- spark_session(sc)

# Some columns to select
cols <- list("x", "y", "z")

cass_df <- session %>% 
  invoke("read") %>% 
  invoke("format", "org.apache.spark.sql.cassandra") %>% 
  invoke("options", as.environment(list(keyspace="test"))) %>% 
  invoke("load") %>% 
  # We use select(col: String, cols* String) so the first column
  # has to be used separately. If you want only one column the third argument
  # has to be an empty list 
  invoke("select", cols[[1]], cols[2:length(cols)]) %>%
  # Standard lazy cache if you need one
  invoke("cache")

如果您使用谓词可以显着减少获取数据集的数量 pushdown 选项到 "true"(默认)并使用 filter before缓存。

如果您想传递更复杂的查询,请注册临时视图和 sql 方法:

session %>%
  invoke("read") %>% 
  ...
  invoke("load") %>% 
  invoke("createOrReplaceTempView", "some_name")

cass_df <- session %>% 
  invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
  invoke("cache")