Spark 批处理在 2 个 cassandra 集群之间迁移数据

Spark batch to migrate data between 2 cassandra clusters

我正在使用 spark 将一些数据从一个 cassandra table 移动到另一个集群上的另一个 cassandra table。

我为其中一个源集群指定了 cassandra 配置,如下所示:

/*
spark.cassandra.connection.host: 
spark.cassandra.connection.port:
spark.cassandra.auth.username:
spark.cassandra.auth.password:
spark.cassandra.connection.ssl.clientAuth.enabled: true
spark.cassandra.connection.ssl.enabled: true
spark.cassandra.connection.ssl.trustStore.path: 
spark.cassandra.connection.ssl.trustStore.password: 
spark.cassandra.connection.timeout_ms: */

SparkSession spark = SparkSession.builder()
            .config(conf)
            .getOrCreate();

Dataset<Row> df = spark.read()
            .format("org.apache.spark.sql.cassandra")
            .options(config.getSourceTable())
            .load();
df.show();

// *** How/Where do I specify cassandra config in destination cluster? ***
df.write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .options(destinationTbl);

How/Where 我是否在目标集群中指定 cassandra 配置(Java 推荐)?

谢谢!

我还没有测试过,但是基于 Russel Spitzer's blog post 你可以做以下事情(没有在 Java 中测试过,但是应该可以):

  • 设置 2 个配置选项(或在创建 spark 实例时添加它们):
spark.setConf("ClusterSource/spark.cassandra.connection.host", "127.0.0.1");
spark.setConf("ClusterDestination/spark.cassandra.connection.host", "127.0.0.2");
  • 添加到options调用对应集群的名称作为cluster条目。

P.S。另外,请记住,如果您需要迁移数据并在数据上保持 WriteTime and/or TTL,那么您将需要使用 RDD API,因为 DataFrame API.

我有一个类似的用例,但在我的情况下,由于某些连接器问题,我无法使用 Alex 建议的方法建立到第二个集群的连接。所以,我不得不将这个 DataFrame 转换为 RDD 并使用 RDD 方法将其写入第二个 Cassandra Cluster

将所有 Cassandra 连接器详细信息传递到另一个 sparkConfig 文件并使用 CassandraConnector 解析它。

{    
val cluster: CassandraConnector = CassandraConnector(sparkConfig)

      implicit val c: CassandraConnector = cluster

      dataFrame
        .rdd
        .saveToCassandra(keySpaceName, tableName, SomeColumns(ListOfColumns)
}