使用目录的 spark cassandra 连接器问题
spark cassandra connector problem using catalogs
我正在按照说明 found here 连接我的 spark 程序以从 Cassandra 读取数据。这是我配置 spark 的方式:
val configBuilder = SparkSession.builder
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.config("spark.cassandra.connection.host", cassandraUrl)
.config("spark.cassandra.connection.port", 9042)
.config("spark.sql.catalog.myCatalogName", "com.datastax.spark.connector.datasource.CassandraCatalog")
根据文档,一旦完成,我应该能够像这样查询 Cassandra:
spark.sql("select * from myCatalogName.myKeyspace.myTable where myPartitionKey = something")
然而,当我这样做时,我收到以下错误消息:
mismatched input '.' expecting <EOF>(line 1, pos 43)
== SQL ==
select * from myCatalog.myKeyspace.myTable where myPartitionKey = something
----------------------------------^^^
当我尝试以下格式时,我成功地从 Cassandra 检索条目:
val frame = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myKeyspace", "table" -> "myTable"))
.load()
.filter(col("timestamp") > startDate && col("timestamp") < endDate)
但是,此查询需要执行完整的 table 扫描。 table 包含几百万个条目,我更愿意利用谓词下推功能,它似乎只能通过 SQL API.
我正在使用 spark-core_2.11:2.4.3、spark-cassandra-connector_2.11:2.5.0 和 Cassandra 3.11.6
谢谢!
目录 API 仅在尚未发布的 SCC 3.0 版中可用。它将与 Spark 3.0 版本一起发布,因此在 SCC 2.5.0 中不可用。因此,对于 2.5.0,您需要使用 create or replace temporary view...
显式注册 table,如 described in docs:
spark.sql("""CREATE TEMPORARY VIEW myTable
USING org.apache.spark.sql.cassandra
OPTIONS (
table "myTable",
keyspace "myKeyspace",
pushdown "true")""")
关于下推(它们对所有 Dataframe APIs、SQL、Scala、Python、...都相同)- 当您的 timestamp
是第一个聚类列。即使在那种情况下,典型的问题是您可能将 startDate
和 endDate
指定为字符串,而不是时间戳。您可以通过执行 frame.explain
来检查,并检查谓词是否被下推 - 它应该在谓词名称附近有 *
标记。
例如,
val data = spark.read.cassandraFormat("sdtest", "test").load()
val filtered = data.filter("ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")
val not_filtered = data.filter("ts >= '2019-03-10T14:41:34.373+0000' AND ts <= '2019-03-10T19:01:56.316+0000'")
第一个 filter
表达式将下推谓词,而第二个 (not_filtered
) 将需要完全扫描。
我正在按照说明 found here 连接我的 spark 程序以从 Cassandra 读取数据。这是我配置 spark 的方式:
val configBuilder = SparkSession.builder
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.config("spark.cassandra.connection.host", cassandraUrl)
.config("spark.cassandra.connection.port", 9042)
.config("spark.sql.catalog.myCatalogName", "com.datastax.spark.connector.datasource.CassandraCatalog")
根据文档,一旦完成,我应该能够像这样查询 Cassandra:
spark.sql("select * from myCatalogName.myKeyspace.myTable where myPartitionKey = something")
然而,当我这样做时,我收到以下错误消息:
mismatched input '.' expecting <EOF>(line 1, pos 43)
== SQL ==
select * from myCatalog.myKeyspace.myTable where myPartitionKey = something
----------------------------------^^^
当我尝试以下格式时,我成功地从 Cassandra 检索条目:
val frame = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myKeyspace", "table" -> "myTable"))
.load()
.filter(col("timestamp") > startDate && col("timestamp") < endDate)
但是,此查询需要执行完整的 table 扫描。 table 包含几百万个条目,我更愿意利用谓词下推功能,它似乎只能通过 SQL API.
我正在使用 spark-core_2.11:2.4.3、spark-cassandra-connector_2.11:2.5.0 和 Cassandra 3.11.6
谢谢!
目录 API 仅在尚未发布的 SCC 3.0 版中可用。它将与 Spark 3.0 版本一起发布,因此在 SCC 2.5.0 中不可用。因此,对于 2.5.0,您需要使用 create or replace temporary view...
显式注册 table,如 described in docs:
spark.sql("""CREATE TEMPORARY VIEW myTable
USING org.apache.spark.sql.cassandra
OPTIONS (
table "myTable",
keyspace "myKeyspace",
pushdown "true")""")
关于下推(它们对所有 Dataframe APIs、SQL、Scala、Python、...都相同)- 当您的 timestamp
是第一个聚类列。即使在那种情况下,典型的问题是您可能将 startDate
和 endDate
指定为字符串,而不是时间戳。您可以通过执行 frame.explain
来检查,并检查谓词是否被下推 - 它应该在谓词名称附近有 *
标记。
例如,
val data = spark.read.cassandraFormat("sdtest", "test").load()
val filtered = data.filter("ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")
val not_filtered = data.filter("ts >= '2019-03-10T14:41:34.373+0000' AND ts <= '2019-03-10T19:01:56.316+0000'")
第一个 filter
表达式将下推谓词,而第二个 (not_filtered
) 将需要完全扫描。