使用 Spark 3.0 读取 Cassandra TTL 和 WRITETIME 时出错

Error reading Cassandra TTL and WRITETIME with Spark 3.0

尽管来自 DataStax 的最新 spark-cassandra-connector states 它支持 reading/writing TTL 和 WRITETIME 我仍然收到 SQL 未定义函数错误。

在 9.1 LTS ML(包括 Apache Spark 3.1.2、Scala 2.12)集群上将 Databricks 与库 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0 和 CassandraSparkExtensions 的 Spark 配置结合使用。 CQL 版本 3.4.5.

spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

已使用笔记本代码确认配置:

spark.conf.get("spark.sql.extensions")

输出[7]:'com.datastax.spark.connector.CassandraSparkExtensions'

# Cassandra connection configs using Data Source API V2
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.host", "10.1.4.4")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.port", "9042")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.username", dbutils.secrets.get(scope = "myScope", key = "CassUsername"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.password", dbutils.secrets.get(scope = "myScope", key = "CassPassword")) 
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.enabled", True)
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.path", "/dbfs/user/client-truststore.jks")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.password", dbutils.secrets.get("key-vault-secrets", "cassTrustPassword"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.dse.continuous_paging_enabled", False) 

# catalog name will be "cassandrauat" for Cassandra
spark.conf.set("spark.sql.catalog.cassandrauat", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set("spark.sql.catalog.cassandrauat.prop", "key")
spark.conf.set("spark.sql.defaultCatalog", "cassandrauat") # will override Spark to use Cassandra for all databases
%sql 
select id, did, ts, val, ttl(val) 
from cassandrauat.myKeyspace.myTable

SQL 语句中的错误:AnalysisException:未定义的函数:'ttl'。该函数既不是注册的临时函数,也不是数据库中注册的永久函数'default'.;第 1 行位置 25

当 运行 在 Cassandra 集群上直接执行相同的 CQL 查询时,它会产生一个结果。

感谢任何有关 CassandraSparkExtensions 未加载原因的帮助。

为预加载库后发生的 NoSuchMethodError 添加完整堆栈跟踪

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.replaceMetadata(CassandraMetadataFunctions.scala:152)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply.$anonfun$applyOrElse(CassandraMetadataFunctions.scala:187)
    at scala.collection.immutable.Stream.foldLeft(Stream.scala:549)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply.applyOrElse(CassandraMetadataFunctions.scala:186)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply.applyOrElse(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:90)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:221)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:221)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute(RuleExecutor.scala:218)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$adapted(RuleExecutor.scala:210)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:210)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:271)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:264)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:191)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:109)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck(Analyzer.scala:246)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:347)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:245)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed(QueryExecution.scala:96)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:180)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows(Dataset.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
    at org.apache.spark.sql.SparkSession.$anonfun$sql(SparkSession.scala:689)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:684)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.$anonfun$executeSql(SQLDriverLocal.scala:91)
    at scala.collection.TraversableLike.$anonfun$map(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:37)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:129)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

如果您刚刚通过集群 UI 添加了 Spark Cassandra 连接器,那么它将不起作用 - 原因是库在 Spark 启动后安装到集群中,因此 class 在 spark.sql.extensions 未找到。

要解决此问题,您需要在 Spark 启动之前将 Jar 文件放入集群节点 - 您可以使用 cluster init script 来完成此操作,它将直接使用类似这样的东西下载 jar(但它会下载多个副本 -对于每个节点):

#!/bin/bash

wget -q -O /databricks/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
  https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-assembly_2.12/3.1.0/spark-cassandra-connector-assembly_2.12-3.1.0.jar

或者最好下载程序集jar,放到DBFS,然后从DBFS复制到目标目录(比如上传到/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar):

#!/bin/bash

cp /dbfs/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
  /databricks/jars/

更新(13.11.2021):SCC 3.1.0 与 Spark 3.2.0 不完全兼容(部分已在 DBR 9.1 中)。有关详细信息,请参阅 SPARKC-670