如何使用 spark cassandra 连接器连接到 1 个以上的 cassandra 主机
how to connect to more than 1 cassandra hosts using spark cassandra connector
我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,经过一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要连接到另外 1 个 cassandra 集群。
我看到一个用于连接到 cassandra 的 CassandraConnector class,但它使用 CassandraConnectorConf 对象创建一个对象,该对象带有很多我不知道的参数。
任何帮助都会有帮助
使用以下代码:
SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "<cassandraHost>");
CassandraConnector connector = CassandraConnector.apply(confForCassandra);
javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();
如果您想使用 Scala 和 Spark 连接到两个 Cassandra 集群,您可以使用以下代码:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
def twoClusterExample ( sc: SparkContext) = {
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))
val rddFromClusterOne = {
// Sets connectorToClusterOne as default connection for everything in this code block
implicit val c = connectorToClusterOne
sc.cassandraTable("ks","tab")
}
{
//Sets connectorToClusterTwo as the default connection for everything in this code block
implicit val c = connectorToClusterTwo
rddFromClusterOne.saveToCassandra("ks","tab")
}
}
原始代码由 RusselSpitzer 在这里编写:https://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d
目前无法使用 Python 和 Spark 执行此操作。
我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,经过一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要连接到另外 1 个 cassandra 集群。
我看到一个用于连接到 cassandra 的 CassandraConnector class,但它使用 CassandraConnectorConf 对象创建一个对象,该对象带有很多我不知道的参数。
任何帮助都会有帮助
使用以下代码:
SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "<cassandraHost>");
CassandraConnector connector = CassandraConnector.apply(confForCassandra);
javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();
如果您想使用 Scala 和 Spark 连接到两个 Cassandra 集群,您可以使用以下代码:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
def twoClusterExample ( sc: SparkContext) = {
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))
val rddFromClusterOne = {
// Sets connectorToClusterOne as default connection for everything in this code block
implicit val c = connectorToClusterOne
sc.cassandraTable("ks","tab")
}
{
//Sets connectorToClusterTwo as the default connection for everything in this code block
implicit val c = connectorToClusterTwo
rddFromClusterOne.saveToCassandra("ks","tab")
}
}
原始代码由 RusselSpitzer 在这里编写:https://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d
目前无法使用 Python 和 Spark 执行此操作。