Netty 版本与 Spark + Elasticsearch Transport 冲突

Netty Version Conflict with Spark + Elasticsearch Transport

这有几个之前的问题,有答案,但答案通常没有足够清晰的信息来解决问题。

我正在使用 Apache Spark 将数据提取到 Elasticsearch 中。我们正在使用 X-Pack 安全性及其相应的传输客户端。在特殊情况下,我使用传输客户端到 create/delete 索引,然后使用 Spark 进行摄取。当我们的代码到达 client.close() 时抛出异常:

Exception in thread "elasticsearch[_client_][generic][T#2]" java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
        at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal(Netty4Transport.java:443)
        at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
        at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:426)
        at org.elasticsearch.transport.TcpTransport.lambda$doStop(TcpTransport.java:959)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

起初,我认为X-Pack Transport客户端使用的是从Spark进来的Netty,所以我排除了它。即使在排除它之后,我们 运行 也会遇到同样的问题。这是我们的一组依赖项:

    libraryDependencies ++= Seq(
   "com.crealytics" % "spark-excel_2.11" % "0.9.1" exclude("io.netty", "netty-all"),
  "com.github.alexarchambault" %% "scalacheck-shapeless_1.13" % "1.1.6" % Test,
  "com.holdenkarau" % "spark-testing-base_2.11" % "2.2.0_0.7.4" % Test exclude("org.scalatest", "scalatest_2.11") ,
  "com.opentable.components" % "otj-pg-embedded" % "0.9.0" % Test,
  "org.apache.spark" % "spark-core_2.11" % "2.2.0" % "provided" exclude("org.scalatest", "scalatest_2.11") exclude("io.netty", "netty-all"),
  "org.apache.spark" % "spark-sql_2.11" % "2.2.0" % "provided" exclude("org.scalatest", "scalatest_2.11") exclude("io.netty", "netty-all"),
  "org.apache.spark" % "spark-hive_2.11" % "2.2.0" % "provided" exclude("org.scalatest", "scalatest_2.11") exclude("io.netty", "netty-all"),
  "org.apache.logging.log4j" % "log4j-core" %"2.8.2",
  "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.5.0" exclude("org.scalatest", "scalatest_2.11") exclude("io.netty", "netty-all"),
  "org.elasticsearch.client" % "x-pack-transport" % "5.5.0",
  "org.elasticsearch.client" % "transport" % "5.5.0",
  "org.elasticsearch.test" % "framework" % "5.4.3" % Test,
  "org.postgresql" % "postgresql" % "42.1.4",
  "org.scalamock" %% "scalamock-scalatest-support" % "3.5.0" % Test,
  "org.scalatest" % "scalatest_2.11" % "3.0.1" % Test,
  "org.scalacheck" %% "scalacheck" % "1.13.4" % Test,
  "org.scalactic" %% "scalactic" % "3.0.1",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test,
  "mysql" % "mysql-connector-java" % "5.1.44"
      )

我用 sbt dependencyTree 验证了 SBT 没有从 Spark 和 spark-excel 中排除 netty,但我不确定为什么...我们使用的是 SBT 1.0.4。

更新:spark-submit/Spark 是罪魁祸首,请在下面回答!

好吧,几经磨难,我想通了。问题不在于 SBT 未能排除库,而是完美地排除了它们。问题是,即使我排除了任何不是 4.1.11.Final 的 Netty 版本,Spark 仍在使用它自己的 jar,在 SBT 和我构建的 jar 之外。

spark-submit 为 运行 时,它包括来自 $SPARK_HOME/lib 目录的 jar。其中之一是 Netty 4 的旧版本。此调用显示了此问题:

bootstrap.getClass().getProtectionDomain().getCodeSource()

结果是 /usr/local/Cellar/apache-spark/2.2.0/libexec/jars/netty-all-4.0.43.Final.jar

的 jar 位置

因此,Spark 包含了它自己的 Netty 依赖项。当我在 SBT 中创建我的罐子时,它有正确的罐子。 Spark 对此有一个配置,称为 spark.driver.userClassPathFirst 记录 in the Spark config documentation 但是当我将其设置为 true 时,我最终遇到了与使用更高版本的 Netty 有关的问题。

我决定放弃使用 Transport 客户端,转而使用可靠的旧 HTTP 请求。

我遇到了同样的问题,需要将 Netty 与 Spark 结合使用的依赖项。我也尝试了 spark.driver.userClassPathFirst 选项,但它没有用。我确实找到了另一个我认为我会分享的解决方法,以防将来对其他人有帮助。

因为我们正在创建一个与 spark-submit 一起使用的程序集 jar,所以我想我可以只隐藏程序集 jar 中的依赖关系,这样 spark-submit 就可以引入它自己的 Netty 版本而不会发生冲突。我们正在使用 https://github.com/sbt/sbt-assembly 插件,所以我需要做的就是将其包含在我的 build.sbt 相关模块中:

assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("io.netty.**" -> "shadenetty.@1").inAll
)

从 spark-core 中排除 Netty 依赖对我们有用

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
            </exclusions>
        </dependency>