如何将 directJoin 与 spark (scala) 一起使用?
How can I use directJoin with spark (scala)?
我正在尝试将 directJoin 与分区键结合使用。但是当我 运行 引擎时,它不使用 directJoin。我想了解我是否做错了什么。这是我使用的代码:
配置设置:
val sparkConf: SparkConf = new SparkConf()
.set(
s"spark.sql.extensions",
"com.datastax.spark.connector.CassandraSparkExtensions"
)
.set(
s"spark.sql.catalog.CassandraCommercial",
"com.datastax.spark.connector.datasource.CassandraCatalog"
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.host",
Settings.cassandraServerAddress
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.username",
Settings.cassandraUser
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.password",
Settings.cassandraPass
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.port",
Settings.cassandraPort
)
我正在使用目录,因为我打算在不同的集群上使用数据库。
Spark 会话:
val sparkSession: SparkSession = SparkSession
.builder()
.config(sparkConf)
.appName(Settings.appName)
.getOrCreate()
我尝试了以下两种方法:
这个:
val parameterVOne= spark.read
.table("CassandraCommercial.ky.parameters")
.select(
"id",
"year",
"code"
)
还有这个:
val parameterVTwo= spark.read
.cassandraFormat("parameters", "CassandraCommercial.ky")
.load
.select(
"id",
"year",
"code"
)
第一个,spark虽然没有用directjoin,但是用show()也能正常调出数据:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19, year#22, code#0]
+- SortMergeJoin [id#19, year#22, code#0], [id#0, year#3, code#2, value#6], Inner, ((id#19 = id#0) AND (year#22 = year#3) AND (code#0 = code#2))
第二个return这个:
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=307be82d): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/0:0:0:0:0:0:0:1:9042, hostId=null, hashCode=3ebc1052): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]
显然第二种方式没有采用目录中定义的设置,与第一种方式不同,它直接访问本地主机。
有键的dataframe只有7行,而cassandra dataframe有大约200万。
这是我的 bild.sbt:
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"
lazy val root = (project in file("."))
.settings(
name := "test-job",
idePackagePrefix := Some("com.teste"),
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1",
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1",
libraryDependencies += "org.postgresql" % "postgresql" % "42.3.3",
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0",
libraryDependencies += "joda-time" % "joda-time" % "2.10.14",
libraryDependencies += "com.crealytics" %% "spark-excel" % "3.2.1_0.16.5-pre2",
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector-assembly_2.12" % "3.1.0"
)
我在某些版本的 Spark 中看到过这种行为 - 不幸的是,Spark 内部结构的变化经常破坏此功能,因为它依赖于内部细节。因此,请提供有关使用哪个版本的 Spark & Spark 连接器的更多信息。
关于第二个错误,我怀疑direct join可能没有使用Spark的SQL属性,能否尝试使用spark.cassandra.connection.host
、spark.cassandra.auth.password
等configuration parameters ?
P.S。我有一个 long blog post on using DirectJoin,但它是在 Spark 2 上测试的。4.x(也许在 3.0 上,不记得了
我正在尝试将 directJoin 与分区键结合使用。但是当我 运行 引擎时,它不使用 directJoin。我想了解我是否做错了什么。这是我使用的代码:
配置设置:
val sparkConf: SparkConf = new SparkConf()
.set(
s"spark.sql.extensions",
"com.datastax.spark.connector.CassandraSparkExtensions"
)
.set(
s"spark.sql.catalog.CassandraCommercial",
"com.datastax.spark.connector.datasource.CassandraCatalog"
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.host",
Settings.cassandraServerAddress
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.username",
Settings.cassandraUser
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.password",
Settings.cassandraPass
)
.set(
s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.port",
Settings.cassandraPort
)
我正在使用目录,因为我打算在不同的集群上使用数据库。
Spark 会话:
val sparkSession: SparkSession = SparkSession
.builder()
.config(sparkConf)
.appName(Settings.appName)
.getOrCreate()
我尝试了以下两种方法:
这个:
val parameterVOne= spark.read
.table("CassandraCommercial.ky.parameters")
.select(
"id",
"year",
"code"
)
还有这个:
val parameterVTwo= spark.read
.cassandraFormat("parameters", "CassandraCommercial.ky")
.load
.select(
"id",
"year",
"code"
)
第一个,spark虽然没有用directjoin,但是用show()也能正常调出数据:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19, year#22, code#0]
+- SortMergeJoin [id#19, year#22, code#0], [id#0, year#3, code#2, value#6], Inner, ((id#19 = id#0) AND (year#22 = year#3) AND (code#0 = code#2))
第二个return这个:
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=307be82d): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/0:0:0:0:0:0:0:1:9042, hostId=null, hashCode=3ebc1052): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]
显然第二种方式没有采用目录中定义的设置,与第一种方式不同,它直接访问本地主机。
有键的dataframe只有7行,而cassandra dataframe有大约200万。
这是我的 bild.sbt:
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"
lazy val root = (project in file("."))
.settings(
name := "test-job",
idePackagePrefix := Some("com.teste"),
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1",
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.1",
libraryDependencies += "org.postgresql" % "postgresql" % "42.3.3",
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0",
libraryDependencies += "joda-time" % "joda-time" % "2.10.14",
libraryDependencies += "com.crealytics" %% "spark-excel" % "3.2.1_0.16.5-pre2",
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector-assembly_2.12" % "3.1.0"
)
我在某些版本的 Spark 中看到过这种行为 - 不幸的是,Spark 内部结构的变化经常破坏此功能,因为它依赖于内部细节。因此,请提供有关使用哪个版本的 Spark & Spark 连接器的更多信息。
关于第二个错误,我怀疑direct join可能没有使用Spark的SQL属性,能否尝试使用spark.cassandra.connection.host
、spark.cassandra.auth.password
等configuration parameters ?
P.S。我有一个 long blog post on using DirectJoin,但它是在 Spark 2 上测试的。4.x(也许在 3.0 上,不记得了