通过 sparklyr 将 cassandra table 导入到 spark - 可能 select 只有一些列?
Importing cassandra table into spark via sparklyr - possible to select only some columns?
我一直在与 sparklyr
合作,将大型 cassandra 表引入 spark,用 R 注册这些表并对其进行 dplyr
操作。
我已经使用如下代码成功导入了 cassandra 表:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间 import/register,有些会导致内存溢出,即使有 6 个节点 运行 总共 60内核和 192GB 内存。但是,我通常只需要每个 cassandra 数据库中的一些列。
我的问题是:
- 是否可以在 import/registration 上过滤 cassandra 数据库,以便它只导入一些列或在主键上过滤(即通过传递
SQL
/ CQL
输入查询,例如 SELECT name FROM cass_table WHERE id = 5
)?
- 在上面的代码中,这样的查询会去哪里,语法采用什么形式?
我尝试在选项列表中添加这样的查询作为附加选项,即:
list(. . . , select = "id")
以及在 %>% invoke("load")
之前将其作为单独的管道调用,即:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
但是这些都不行。有什么建议吗?
您可以跳过急切缓存和 select 个感兴趣的列:
session <- spark_session(sc)
# Some columns to select
cols <- list("x", "y", "z")
cass_df <- session %>%
invoke("read") %>%
invoke("format", "org.apache.spark.sql.cassandra") %>%
invoke("options", as.environment(list(keyspace="test"))) %>%
invoke("load") %>%
# We use select(col: String, cols* String) so the first column
# has to be used separately. If you want only one column the third argument
# has to be an empty list
invoke("select", cols[[1]], cols[2:length(cols)]) %>%
# Standard lazy cache if you need one
invoke("cache")
如果您使用谓词可以显着减少获取数据集的数量 pushdown
选项到 "true"
(默认)并使用 filter
before缓存。
如果您想传递更复杂的查询,请注册临时视图和 sql
方法:
session %>%
invoke("read") %>%
...
invoke("load") %>%
invoke("createOrReplaceTempView", "some_name")
cass_df <- session %>%
invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
invoke("cache")
我一直在与 sparklyr
合作,将大型 cassandra 表引入 spark,用 R 注册这些表并对其进行 dplyr
操作。
我已经使用如下代码成功导入了 cassandra 表:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间 import/register,有些会导致内存溢出,即使有 6 个节点 运行 总共 60内核和 192GB 内存。但是,我通常只需要每个 cassandra 数据库中的一些列。
我的问题是:
- 是否可以在 import/registration 上过滤 cassandra 数据库,以便它只导入一些列或在主键上过滤(即通过传递
SQL
/CQL
输入查询,例如SELECT name FROM cass_table WHERE id = 5
)? - 在上面的代码中,这样的查询会去哪里,语法采用什么形式?
我尝试在选项列表中添加这样的查询作为附加选项,即:
list(. . . , select = "id")
以及在 %>% invoke("load")
之前将其作为单独的管道调用,即:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
但是这些都不行。有什么建议吗?
您可以跳过急切缓存和 select 个感兴趣的列:
session <- spark_session(sc)
# Some columns to select
cols <- list("x", "y", "z")
cass_df <- session %>%
invoke("read") %>%
invoke("format", "org.apache.spark.sql.cassandra") %>%
invoke("options", as.environment(list(keyspace="test"))) %>%
invoke("load") %>%
# We use select(col: String, cols* String) so the first column
# has to be used separately. If you want only one column the third argument
# has to be an empty list
invoke("select", cols[[1]], cols[2:length(cols)]) %>%
# Standard lazy cache if you need one
invoke("cache")
如果您使用谓词可以显着减少获取数据集的数量 pushdown
选项到 "true"
(默认)并使用 filter
before缓存。
如果您想传递更复杂的查询,请注册临时视图和 sql
方法:
session %>%
invoke("read") %>%
...
invoke("load") %>%
invoke("createOrReplaceTempView", "some_name")
cass_df <- session %>%
invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
invoke("cache")