Datastax spark cassandra 连接器与 RetryPolicy 将 DF 写入 cassandra table
Datastax spark cassandra connector with RetryPolicy to write DF to cassandra table
我正在尝试使用一致性级别 "EACH_QUORUM" 将 spark Dataframe 写入 cassandra。我的代码如下所示:
val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();
下面是写DF的代码:
df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()
我想添加一个重试策略,这样如果其中一个数据中心出现故障,将降级一致性写入 LOCAL_QUORUM。
我知道 datastax 有一个 class MultipleRetryPolicy.scala 我应该扩展它,覆盖方法以添加自定义逻辑并在 cassandra conf 中使用它的实例。
如何将此策略应用于我的 sparksession 或保存操作?在 Scala 中有没有使用或不使用 RetryPolicy 来实现我的要求的任何其他方法?
你不想要 MultipleRetryPolicy
,你在 DowngradingConsistencyRetryPolicy 之后,它不是 spark 驱动程序的一部分,所以除非你移植策略,否则将此作为驱动程序设置的一部分进行交给斯卡拉。
您可以做的是将查询执行包装在一个 try 中并捕获 UnavailableException
然后通过更改 output.consistency.level parameter.
以较低的一致性重试
我正在尝试使用一致性级别 "EACH_QUORUM" 将 spark Dataframe 写入 cassandra。我的代码如下所示:
val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();
下面是写DF的代码:
df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()
我想添加一个重试策略,这样如果其中一个数据中心出现故障,将降级一致性写入 LOCAL_QUORUM。
我知道 datastax 有一个 class MultipleRetryPolicy.scala 我应该扩展它,覆盖方法以添加自定义逻辑并在 cassandra conf 中使用它的实例。
如何将此策略应用于我的 sparksession 或保存操作?在 Scala 中有没有使用或不使用 RetryPolicy 来实现我的要求的任何其他方法?
你不想要 MultipleRetryPolicy
,你在 DowngradingConsistencyRetryPolicy 之后,它不是 spark 驱动程序的一部分,所以除非你移植策略,否则将此作为驱动程序设置的一部分进行交给斯卡拉。
您可以做的是将查询执行包装在一个 try 中并捕获 UnavailableException
然后通过更改 output.consistency.level parameter.