spark setCassandraConf 未按预期工作
spark setCassandraConf is not working as expected
我正在使用 .setCassandraConf(c_options_conf) 设置 sparkSession 以连接 cassandra 集群,如下所示。
工作正常:
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.getOrCreate()
.setCassandraConf(c_options_conf)
如果我使用数据帧编写器对象保存 table 如下所示,它指向配置的集群并保存在 Cassandra 中非常好,如下所示
writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.
但是如果如下所示,它指向本地主机而不是 cassandra 集群并且无法保存。
不工作:
import spark.implicits._
val sc = spark.sparkContext
val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);
它在尝试连接本地主机时抛出错误。
错误:
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
这里有什么问题?为什么它指向默认的本地主机,即使 sparkSession 设置为 cassandra 集群并且早期的方法工作正常。
通过 spark.cassandra.connection.host
Spark 属性(而不是通过 setCassandraConf
!)设置 IP 适用于 RDD 和数据帧。这个 属性 可以在提交作业时从命令行设置,或者明确地设置(来自文档的示例):
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)
看看documentation for connector, including reference about existing configuration properties。
我们需要使用 SparkSession
的两个设置方法来设置配置,即 .config(conf)
和 .setCassandraConf(c_options_conf)
具有相同的值,如下所示
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
.config("spark.executor.memory",devProps.getString("spark.executor.memory"))
.config("spark.executor.cores",devProps.getString("spark.executor.cores"))
.config("spark.executor.instances",devProps.getString("spark.executor.instances"))
.config(conf)
.getOrCreate()
.setCassandraConf(c_options_conf)
然后我会为 cassandra 工作最新 api 以及 RDD/DF Api.
我正在使用 .setCassandraConf(c_options_conf) 设置 sparkSession 以连接 cassandra 集群,如下所示。
工作正常:
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.getOrCreate()
.setCassandraConf(c_options_conf)
如果我使用数据帧编写器对象保存 table 如下所示,它指向配置的集群并保存在 Cassandra 中非常好,如下所示
writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.
但是如果如下所示,它指向本地主机而不是 cassandra 集群并且无法保存。
不工作:
import spark.implicits._
val sc = spark.sparkContext
val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);
它在尝试连接本地主机时抛出错误。
错误:
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
这里有什么问题?为什么它指向默认的本地主机,即使 sparkSession 设置为 cassandra 集群并且早期的方法工作正常。
通过 spark.cassandra.connection.host
Spark 属性(而不是通过 setCassandraConf
!)设置 IP 适用于 RDD 和数据帧。这个 属性 可以在提交作业时从命令行设置,或者明确地设置(来自文档的示例):
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)
看看documentation for connector, including reference about existing configuration properties。
我们需要使用 SparkSession
的两个设置方法来设置配置,即 .config(conf)
和 .setCassandraConf(c_options_conf)
具有相同的值,如下所示
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
.config("spark.executor.memory",devProps.getString("spark.executor.memory"))
.config("spark.executor.cores",devProps.getString("spark.executor.cores"))
.config("spark.executor.instances",devProps.getString("spark.executor.instances"))
.config(conf)
.getOrCreate()
.setCassandraConf(c_options_conf)
然后我会为 cassandra 工作最新 api 以及 RDD/DF Api.