如何使用 spark cassandra 连接器连接到 1 个以上的 cassandra 主机

how to connect to more than 1 cassandra hosts using spark cassandra connector

我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,经过一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要连接到另外 1 个 cassandra 集群。

我看到一个用于连接到 cassandra 的 CassandraConnector class,但它使用 CassandraConnectorConf 对象创建一个对象,该对象带有很多我不知道的参数。

任何帮助都会有帮助

使用以下代码:

SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "<cassandraHost>");

CassandraConnector connector = CassandraConnector.apply(confForCassandra);

javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();

如果您想使用 Scala 和 Spark 连接到两个 Cassandra 集群,您可以使用以下代码:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    // Sets connectorToClusterOne as default connection for everything in this code block
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","tab")
  }

  {
    //Sets connectorToClusterTwo as the default connection for everything in this code block
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","tab")
  }

} 

原始代码由 RusselSpitzer 在这里编写:https://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d

目前无法使用 Python 和 Spark 执行此操作。