Datastax spark cassandra 连接器与 RetryPolicy 将 DF 写入 cassandra table

Datastax spark cassandra connector with RetryPolicy to write DF to cassandra table

我正在尝试使用一致性级别 "EACH_QUORUM" 将 spark Dataframe 写入 cassandra。我的代码如下所示:

val sparkBuilder = SparkSession.builder().
  config(cassandraHostPropertyProperty, cassandraHosts).
  config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
  config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
  config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
  getOrCreate();

下面是写DF的代码:

df.write.cassandraFormat(tableName, keySpaceName)
    .mode(SaveMode.Append)
    .options(Map(
      WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
      WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
    ))
    .save()

我想添加一个重试策略,这样如果其中一个数据中心出现故障,将降级一致性写入 LOCAL_QUORUM。

我知道 datastax 有一个 class MultipleRetryPolicy.scala 我应该扩展它,覆盖方法以添加自定义逻辑并在 cassandra conf 中使用它的实例。

如何将此策略应用于我的 sparksession 或保存操作?在 Scala 中有没有使用或不使用 RetryPolicy 来实现我的要求的任何其他方法?

你不想要 MultipleRetryPolicy,你在 DowngradingConsistencyRetryPolicy 之后,它不是 spark 驱动程序的一部分,所以除非你移植策略,否则将此作为驱动程序设置的一部分进行交给斯卡拉。

您可以做的是将查询执行包装在一个 try 中并捕获 UnavailableException 然后通过更改 output.consistency.level parameter.

以较低的一致性重试