由于 Guava 库版本不同,YARN 上的 Spark 与 Elasticsearch TransportClient 冲突

Spark on YARN confilict with Elasticsearch TransportClient because of Guava library different versions

我想 运行 Google 云 VM 集群上的 Spark 作业,在地图操作中我需要对弹性搜索进行查询。我的问题是 Spark 和 Elastic Search 在 Guava 库上有冲突,因为 Spark 使用 Guava 14 和 ES Guava 18。

我的问题是这个方法调用 com.google.common.util.concurrent.MoreExecutors.directExecutor(),在 Guava 18 中存在,但在 Guava 14 中不存在。

更详细地说,我正在尝试做的工作如下所示。

 input.map(record=>{
    val client=openConnection()
    val newdata=client.query(record.someInfo)
      new record(newdata)
})

方法openConnection如下所示

 public static TransportClient openConnection(String ipAddress, int ipPort) throws UnknownHostException {


    Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch").build();
    TransportClient client = TransportClient.builder().settings(settings).build().
            addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), ipPort));

    return client;

}

我尝试通过在sbt文件中添加如下着色规则来使用着色强制ES使用Guava 18:

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided"  ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "

 libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",

 assemblyShadeRules in assembly :=Seq(
  ShadeRule.rename("com.google.common.*" -> "googlecommona.@1").
    inLibrary("org.elasticsearch" % "elasticsearch" % "2.2.0"))

但是问题似乎仍然存在。 有没有办法解决这个冲突?

您不能从提供的依赖项中排除过渡依赖项。 通过标记它 provided,您告诉打包器不要将它放入最终的 jar 中,因为您假设它已经在将要部署的类路径中。

所以你不能排除过渡依赖,因为整个依赖本身已经被排除了。

您的选择是:

  • 从 ElasticSearch 库中排除依赖:这意味着 ES 将使用 Spark 提供的 Guava,并且可能是不兼容的版本
  • 切换到具有相同 Guava 版本的 ElasticSearch 库(小版本的差异大部分是兼容的,尽管您可能仍然需要排除它)
  • 切换到与 ElasticSearch 具有相同 Guava 版本的 Spark 版本。
  • 使用阴影:现在可以在 sbt-assembly SBT 插件中使用。底纹是类的重命名。您知道 ElasticSearch 和 Spark 都有自己的 Guava 版本,它们也将其作为依赖项包括在内。于是你让SBT把ES提供的Guava重命名为Guava1,Spark提供的Guava重命名为Guava2,Spark中对Guava的每一次引用都会重命名为Guava1,ES也一样重命名为Guava2。

您可以阅读有关 sbt-assembly 着色的信息 here

阴影是答案:我在 build.sbt 文件中添加了以下规则。

下面的解决方案适用于使用 ElasticSearch TransportClient class.

的 YARN 上的 SPARK 集群
  assemblyShadeRules in assembly :=Seq(
      ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
  )

为了完整起见,我附上了整个 sbt 文件:

import sbt.ExclusionRule
import sbt.Keys._

lazy val root = (project in file(".")).
  settings(
  name := "scala_code",
  version := "1.0",
  scalaVersion := "2.10.6",
  conflictManager := ConflictManager.latestRevision,
  test in assembly := {},
  assemblyMergeStrategy in assembly := {
      case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
      case _ => MergeStrategy.first
  },

  parallelExecution in test := false,
  libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.6.5",
  libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"  exclude("javax.servlet", "servlet-api"),
  libraryDependencies += "org.wikidata.wdtk" % "wdtk-datamodel" % "0.6.0" exclude ("com.fasterxml.jackson.core",  "jackson-annotations"),
  libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided"  ,
  libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
  libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided",
  libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" % "test",
 += "com.typesafe" % "config" % "1.2.1",
  libraryDependencies += "org.jsoup" % "jsoup" % "1.8.3",
  libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",// exclude("com.google.guava", "guava"),

  assemblyShadeRules in assembly :=Seq(
      ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
  )

)