java.lang.NoSuchMethodError: com.mongodb.internal.operation.SyncOperations.aggregate

java.lang.NoSuchMethodError: com.mongodb.internal.operation.SyncOperations.aggregate

我正在尝试构建一个应用程序作为概念证明,该应用程序将在 scala 中使用 spark 并利用 mongodb。到目前为止,我能够连接到 spark,分别连接到 mongodb。但是当我尝试将 spark 和 mongodb 连接在一起时,出现以下错误:

Exception in thread "main" java.lang.NoSuchMethodError: com.mongodb.internal.operation.SyncOperations.aggregate(Ljava/util/List;Ljava/lang/Class;JJLjava/lang/Integer;Lcom/mongodb/client/model/Collation;Lorg/bson/conversions/Bson;Ljava/lang/String;Ljava/lang/String;Lorg/bson/conversions/Bson;Ljava/lang/Boolean;Lcom/mongodb/internal/client/model/AggregationLevel;)Lcom/mongodb/internal/operation/ExplainableReadOperation;
    at com.mongodb.client.internal.AggregateIterableImpl.asAggregateOperation(AggregateIterableImpl.java:213)
    at com.mongodb.client.internal.AggregateIterableImpl.asReadOperation(AggregateIterableImpl.java:208)
    at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:135)
    at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:92)
    at com.mongodb.client.internal.MongoIterableImpl.forEach(MongoIterableImpl.java:121)
    at com.mongodb.client.internal.MongoIterableImpl.into(MongoIterableImpl.java:130)
    at com.mongodb.spark.sql.connector.schema.InferSchema.lambda$inferSchema[=10=](InferSchema.java:81)
    at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.withCollection(AbstractMongoConfig.java:170)
    at com.mongodb.spark.sql.connector.config.ReadConfig.withCollection(ReadConfig.java:45)
    at com.mongodb.spark.sql.connector.schema.InferSchema.inferSchema(InferSchema.java:81)
    at com.mongodb.spark.sql.connector.MongoTableProvider.inferSchema(MongoTableProvider.java:62)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:295)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at WordCountCucumberScala_GhislainGripon.SparkMapReduceTask.execute(SparkMapReduceTask.scala:24)
    at WordCountCucumberScala_GhislainGripon.MapReduce$.main(MapReduce.scala:12)
    at WordCountCucumberScala_GhislainGripon.MapReduce.main(MapReduce.scala)

只有当我尝试进入 Mongodb 时才会发生这种情况。 我在 IntelliJ IDEA 上使用 Spark 3.1.3、Mongodb 5.0.7 以及 mongospark 连接器 10.0.1 和 scala 2.12.15、java 1.8。

package WordCountCucumberScala_GhislainGripon

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import com.mongodb.spark._

class SparkMapReduceTask(config: Configuration) extends Task{
  override def execute(): Unit = {

    val connectionString = s"mongodb://${config.getUsername}:${config.getPassword}@${config.host}:${config.port}/?authSource=${config.database}"

    val sparkConf = new SparkConf()
      .set("spark.mongodb.read.connection.uri", connectionString)
      .set("spark.mongodb.write.connection.uri", connectionString)
      .setMaster("local")
      .setAppName("MapReduce")

    val spark = SparkSession.builder()
      .config(sparkConf)
      .getOrCreate()

    import spark.implicits._
    //spark.sparkContext.setLogLevel("WARN")
    //val textRDD = spark.read.text(config.data_dir + "/" + config.main_data + ".txt")
    //textRDD.flatMap(line => "(((?U)\w)+)".r.findAllIn(line.mkString).toList)
      //.groupBy($"value").count().orderBy($"count".desc).write.mode("overwrite").csv(config.data_dir + "/MapReduceResults")

    val testRDD = spark.read.format("mongodb").option("database", config.database).option("collection", config.text_table).load()
    testRDD.show()

  }

}

我使用配置 class 从配置文件中获取参数并使用其数据输入 mongodb 连接信息。我尝试修改 spark、mongodb 和连接器的库版本,但无济于事。

这是pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>wewyse</groupId>
  <artifactId>WordCountCucumberScala_GhislainGripon</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderful scala app</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.15</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <spec2.version>4.15.0</spec2.version>
    <spark.version>3.1.3</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatestplus</groupId>
      <artifactId>junit-4-13_${scala.compat.version}</artifactId>
      <version>3.2.12.0</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb.scala</groupId>
      <artifactId>mongo-scala-driver_${scala.compat.version}</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb.spark</groupId>
      <artifactId>mongo-spark-connector</artifactId>
      <version>10.0.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb.scala</groupId>
      <artifactId>mongo-scala-bson_${scala.compat.version}</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>bson</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongodb-driver-reactivestreams</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongodb-driver-core</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>bson-record-codec</artifactId>
      <version>4.6.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>3.2.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.cucumber</groupId>
      <artifactId>cucumber-scala_${scala.compat.version}</artifactId>
      <version>8.2.6</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.cucumber</groupId>
      <artifactId>cucumber-junit-platform-engine</artifactId>
      <version>7.3.3</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.cucumber</groupId>
      <artifactId>cucumber-junit</artifactId>
      <version>7.3.3</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.cucumber</groupId>
      <artifactId>cucumber-core</artifactId>
      <version>7.3.3</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.circe</groupId>
      <artifactId>circe-yaml_${scala.compat.version}</artifactId>
      <version>0.14.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>io.circe</groupId>
      <artifactId>circe-core_${scala.compat.version}</artifactId>
      <version>0.14.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>io.circe</groupId>
      <artifactId>circe-parser_${scala.compat.version}</artifactId>
      <version>0.14.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>io.circe</groupId>
      <artifactId>circe-generic-extras_${scala.compat.version}</artifactId>
      <version>0.14.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compat.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_${scala.compat.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compat.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-catalyst_${scala.compat.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-reflect</artifactId>
      <version>2.12.15</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.21.0</version>
        <configuration>
          <!-- Tests will be run with scalatest-maven-plugin instead -->
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>2.0.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>TestSuiteReport.txt</filereports>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

我已经四处寻找有关此问题的答案,发现我以前使用的 spark 版本可能太新并且不受连接器支持,因此我将其从 3.2.1 降级到 3.1.3。然而,这并没有使错误消失,我不明白问题的根源。我检查了兼容性,mongo-spark-connector 10.0.1 是为 spark 3.1.x 和 mongodb 4.0 或更高版本构建的,尽管这可能只意味着 4.0.x ?

我可以使用上面显示的 URI 通过 scala 访问 mongodb,并且我按照 mongodb website 上给出的 10.0.1

版的说明进行操作

将 Spark 降级到 3.0.3、Mongodb 到 4.0 并从我的 pom.xml 中删除除 mongo-spark-connector 之外的所有 mongodb 库后,它现在大多数情况下都能正常工作,即使有时它会为其他情况下运行的代码抛出错误。

错误可能是由于函数签名重复,连接器在后台使用 mongo java 驱动程序,它非常接近 scala 驱动程序,函数签名重复因此编译器不会'知道使用哪个但失败了。