在 Amazon Keyspaces 上的 Cassandra table 中写入数据框时出错

Error to write dataframe in Cassandra table on Amazon Keyspaces

我正在尝试在 AWS (Keyspace) 上编写数据框,但收到以下消息:

堆栈:

dfExploded.write.cassandraFormat(table = "table", keyspace = "hub").mode(SaveMode.Append).save()
21/08/18 21:45:18 WARN DefaultTokenFactoryRegistry: [s0] Unsupported partitioner 'com.amazonaws.cassandra.DefaultPartitioner', token map will be empty.
java.lang.AssertionError: assertion failed: There are no contact points in the given set of hosts
  at scala.Predef$.assert(Predef.scala:223)
  at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy$.determineDataCenter(LocalNodeFirstLoadBalancingPolicy.scala:195)
  at com.datastax.spark.connector.cql.CassandraConnector$.$anonfun$dataCenterNodes(CassandraConnector.scala:192)
  at scala.Option.getOrElse(Option.scala:189)
  at com.datastax.spark.connector.cql.CassandraConnector$.dataCenterNodes(CassandraConnector.scala:192)
  at com.datastax.spark.connector.cql.CassandraConnector$.alternativeConnectionConfigs(CassandraConnector.scala:207)
  at com.datastax.spark.connector.cql.CassandraConnector$.$anonfun$sessionCache(CassandraConnector.scala:169)
  at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:34)
  at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69)
  at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57)
  at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:89)
  at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
  at com.datastax.spark.connector.datasource.CassandraCatalog$.com$datastax$spark$connector$datasource$CassandraCatalog$$getMetadata(CassandraCatalog.scala:455)
  at com.datastax.spark.connector.datasource.CassandraCatalog$.getTableMetaData(CassandraCatalog.scala:421)
  at org.apache.spark.sql.cassandra.DefaultSource.getTable(DefaultSource.scala:68)
  at org.apache.spark.sql.cassandra.DefaultSource.inferSchema(DefaultSource.scala:72)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at org.apache.spark.sql.DataFrameWriter.getTable(DataFrameWriter.scala:339)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)

Spark提交:

spark-submit --deploy-mode cluster --master yarn  \
--conf=spark.cassandra.connection.port="9142" \
--conf=spark.cassandra.connection.host="cassandra.sa-east-1.amazonaws.com" \
--conf=spark.cassandra.auth.username="BUU" \
--conf=spark.cassandra.auth.password="123456789" \
--conf=spark.cassandra.connection.ssl.enabled="true" \
--conf=spark.cassandra.connection.ssl.trustStore.path="cassandra_truststore.jks"
--conf=spark.cassandra.connection.ssl.trustStore.password="123456"

通过 cqlsh 连接一切正常,但在 spark 中出现此错误

错误状态的问题是 AWS Keyspaces 使用了 Spark-Cassandra 连接器不支持的分区程序 (com.amazonaws.cassandra.DefaultPartitioner)。

关于 AWS Keyspaces 的底层数据库是什么的文档并不多 public 所以我一直怀疑 Keyspaces 前面有一个 CQL API 引擎所以它“看起来”像 Cassandra,但它可能得到 Dynamo DB 等其他东西的支持。我非常高兴被来自 AWS 的人纠正,这样我就可以把它放到床上。

默认的 Cassandra 分区程序是 Murmur3Partitioner,并且是唯一推荐的分区程序。 RandomPartitionerByteOrderedPartitioner 等较旧的分区器仅出于向后兼容性而受支持,但绝不能用于新集群。

最后,我们不会针对 AWS Keyspaces 测试 Spark 连接器,因此请准备好迎接很多惊喜。干杯!

要使用 open-source Spark Cassandra 连接器在 Keyspaces 和 Apache Spark 之间读取和写入数据,您只需更新您的 Keyspaces 帐户的分区程序。

文档:https://docs.aws.amazon.com/keyspaces/latest/devguide/spark-integrating.html