由于 Guava 库版本不同,YARN 上的 Spark 与 Elasticsearch TransportClient 冲突
Spark on YARN confilict with Elasticsearch TransportClient because of Guava library different versions
我想 运行 Google 云 VM 集群上的 Spark 作业,在地图操作中我需要对弹性搜索进行查询。我的问题是 Spark 和 Elastic Search 在 Guava 库上有冲突,因为 Spark 使用 Guava 14 和 ES Guava 18。
我的问题是这个方法调用
com.google.common.util.concurrent.MoreExecutors.directExecutor()
,在 Guava 18 中存在,但在 Guava 14 中不存在。
更详细地说,我正在尝试做的工作如下所示。
input.map(record=>{
val client=openConnection()
val newdata=client.query(record.someInfo)
new record(newdata)
})
方法openConnection
如下所示
public static TransportClient openConnection(String ipAddress, int ipPort) throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch").build();
TransportClient client = TransportClient.builder().settings(settings).build().
addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), ipPort));
return client;
}
我尝试通过在sbt文件中添加如下着色规则来使用着色强制ES使用Guava 18:
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.common.*" -> "googlecommona.@1").
inLibrary("org.elasticsearch" % "elasticsearch" % "2.2.0"))
但是问题似乎仍然存在。
有没有办法解决这个冲突?
您不能从提供的依赖项中排除过渡依赖项。
通过标记它 provided
,您告诉打包器不要将它放入最终的 jar 中,因为您假设它已经在将要部署的类路径中。
所以你不能排除过渡依赖,因为整个依赖本身已经被排除了。
您的选择是:
- 从 ElasticSearch 库中排除依赖:这意味着 ES 将使用 Spark 提供的 Guava,并且可能是不兼容的版本
- 切换到具有相同 Guava 版本的 ElasticSearch 库(小版本的差异大部分是兼容的,尽管您可能仍然需要排除它)
- 切换到与 ElasticSearch 具有相同 Guava 版本的 Spark 版本。
- 使用阴影:现在可以在
sbt-assembly
SBT 插件中使用。底纹是类的重命名。您知道 ElasticSearch 和 Spark 都有自己的 Guava 版本,它们也将其作为依赖项包括在内。于是你让SBT把ES提供的Guava重命名为Guava1,Spark提供的Guava重命名为Guava2,Spark中对Guava的每一次引用都会重命名为Guava1,ES也一样重命名为Guava2。
您可以阅读有关 sbt-assembly 着色的信息 here。
阴影是答案:我在 build.sbt
文件中添加了以下规则。
下面的解决方案适用于使用 ElasticSearch TransportClient
class.
的 YARN 上的 SPARK 集群
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)
为了完整起见,我附上了整个 sbt 文件:
import sbt.ExclusionRule
import sbt.Keys._
lazy val root = (project in file(".")).
settings(
name := "scala_code",
version := "1.0",
scalaVersion := "2.10.6",
conflictManager := ConflictManager.latestRevision,
test in assembly := {},
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case _ => MergeStrategy.first
},
parallelExecution in test := false,
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.6.5",
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" exclude("javax.servlet", "servlet-api"),
libraryDependencies += "org.wikidata.wdtk" % "wdtk-datamodel" % "0.6.0" exclude ("com.fasterxml.jackson.core", "jackson-annotations"),
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided",
libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" % "test",
+= "com.typesafe" % "config" % "1.2.1",
libraryDependencies += "org.jsoup" % "jsoup" % "1.8.3",
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",// exclude("com.google.guava", "guava"),
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)
)
我想 运行 Google 云 VM 集群上的 Spark 作业,在地图操作中我需要对弹性搜索进行查询。我的问题是 Spark 和 Elastic Search 在 Guava 库上有冲突,因为 Spark 使用 Guava 14 和 ES Guava 18。
我的问题是这个方法调用
com.google.common.util.concurrent.MoreExecutors.directExecutor()
,在 Guava 18 中存在,但在 Guava 14 中不存在。
更详细地说,我正在尝试做的工作如下所示。
input.map(record=>{
val client=openConnection()
val newdata=client.query(record.someInfo)
new record(newdata)
})
方法openConnection
如下所示
public static TransportClient openConnection(String ipAddress, int ipPort) throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch").build();
TransportClient client = TransportClient.builder().settings(settings).build().
addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), ipPort));
return client;
}
我尝试通过在sbt文件中添加如下着色规则来使用着色强制ES使用Guava 18:
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.common.*" -> "googlecommona.@1").
inLibrary("org.elasticsearch" % "elasticsearch" % "2.2.0"))
但是问题似乎仍然存在。 有没有办法解决这个冲突?
您不能从提供的依赖项中排除过渡依赖项。
通过标记它 provided
,您告诉打包器不要将它放入最终的 jar 中,因为您假设它已经在将要部署的类路径中。
所以你不能排除过渡依赖,因为整个依赖本身已经被排除了。
您的选择是:
- 从 ElasticSearch 库中排除依赖:这意味着 ES 将使用 Spark 提供的 Guava,并且可能是不兼容的版本
- 切换到具有相同 Guava 版本的 ElasticSearch 库(小版本的差异大部分是兼容的,尽管您可能仍然需要排除它)
- 切换到与 ElasticSearch 具有相同 Guava 版本的 Spark 版本。
- 使用阴影:现在可以在
sbt-assembly
SBT 插件中使用。底纹是类的重命名。您知道 ElasticSearch 和 Spark 都有自己的 Guava 版本,它们也将其作为依赖项包括在内。于是你让SBT把ES提供的Guava重命名为Guava1,Spark提供的Guava重命名为Guava2,Spark中对Guava的每一次引用都会重命名为Guava1,ES也一样重命名为Guava2。
您可以阅读有关 sbt-assembly 着色的信息 here。
阴影是答案:我在 build.sbt
文件中添加了以下规则。
下面的解决方案适用于使用 ElasticSearch TransportClient
class.
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)
为了完整起见,我附上了整个 sbt 文件:
import sbt.ExclusionRule
import sbt.Keys._
lazy val root = (project in file(".")).
settings(
name := "scala_code",
version := "1.0",
scalaVersion := "2.10.6",
conflictManager := ConflictManager.latestRevision,
test in assembly := {},
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case _ => MergeStrategy.first
},
parallelExecution in test := false,
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.6.5",
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" exclude("javax.servlet", "servlet-api"),
libraryDependencies += "org.wikidata.wdtk" % "wdtk-datamodel" % "0.6.0" exclude ("com.fasterxml.jackson.core", "jackson-annotations"),
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided",
libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" % "test",
+= "com.typesafe" % "config" % "1.2.1",
libraryDependencies += "org.jsoup" % "jsoup" % "1.8.3",
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",// exclude("com.google.guava", "guava"),
assemblyShadeRules in assembly :=Seq(
ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)
)