通过 java 客户端的 Spark Cassandra 连接
Spark Cassandra connection through java client
我想通过 spark 作业连接到我的 scylla db/cassandra 并使用 java 客户端执行查找查询。我尝试关注
val spark = SparkSession.builder.appName("ScyllaSparkClient")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show
val vdf = df.mapPartitions(p => {
val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
val session = cluster.connect("MyKeySpace")
val res = p.map(record => {
val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
val row = results.one()
var scyllaRow: Person = null
if (row != null) {
scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
}
scyllaRow
})
session.close()
cluster.close()
res
})
vdf.show()
但遇到主机不可用异常(虽然没有连接问题,但它与 java 客户端一起工作正常)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:210)
at com.datastax.driver.core.RequestHandler.access00(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:114)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 27 more
感谢任何帮助。
使用来自 com.datastax.spark.connector.cql.CassandraConnector 的“CassandraConnector”
它将负责每个分区的会话管理。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("ScyllaSparkClient")
.config("spark.cassandra.connection.host", "localhost")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show
val connector = CassandraConnector(spark.sparkContext.getConf)
val vdf = df.mapPartitions(p => {
connector.withSessionDo { session =>
val res = p.map(record => {
val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
val row = results.one()
var scyllaRow: Person = null
if (row != null) {
scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
}
scyllaRow
})
res
}
})
vdf.show()
}
会成功的!
您需要使用 Spark Cassandra 连接器从 Spark 连接到 Cassandra 数据库。
可从此处获得连接器 -- https://github.com/datastax/spark-cassandra-connector。但是由于您要连接到 Scylla DB,您可能需要使用 Scylla 的连接器分支。干杯!
我想通过 spark 作业连接到我的 scylla db/cassandra 并使用 java 客户端执行查找查询。我尝试关注
val spark = SparkSession.builder.appName("ScyllaSparkClient")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show
val vdf = df.mapPartitions(p => {
val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
val session = cluster.connect("MyKeySpace")
val res = p.map(record => {
val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
val row = results.one()
var scyllaRow: Person = null
if (row != null) {
scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
}
scyllaRow
})
session.close()
cluster.close()
res
})
vdf.show()
但遇到主机不可用异常(虽然没有连接问题,但它与 java 客户端一起工作正常)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:210)
at com.datastax.driver.core.RequestHandler.access00(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:114)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 27 more
感谢任何帮助。
使用来自 com.datastax.spark.connector.cql.CassandraConnector 的“CassandraConnector” 它将负责每个分区的会话管理。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("ScyllaSparkClient")
.config("spark.cassandra.connection.host", "localhost")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show
val connector = CassandraConnector(spark.sparkContext.getConf)
val vdf = df.mapPartitions(p => {
connector.withSessionDo { session =>
val res = p.map(record => {
val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
val row = results.one()
var scyllaRow: Person = null
if (row != null) {
scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
}
scyllaRow
})
res
}
})
vdf.show()
}
会成功的!
您需要使用 Spark Cassandra 连接器从 Spark 连接到 Cassandra 数据库。
可从此处获得连接器 -- https://github.com/datastax/spark-cassandra-connector。但是由于您要连接到 Scylla DB,您可能需要使用 Scylla 的连接器分支。干杯!