由于 CodecConfigurationException,将 CSV 文件数据上传到 mongo 数据库的 Scala Spark 作业失败

Scala Spark job to upload CSV file data to mongo DB fails due to CodecConfigurationException

我是 spark 和 scala 的新手。我正在尝试使用 Scala 中的 spark 作业将 csv 文件上传到 Mongo 数据库。

上传时,在作业执行期间遇到以下错误,

org.bson.codecs.configuration.CodecConfigurationException:找不到 class 的编解码器。

输入文件的路径将在执行过程中传递。

过去 2 天我一直被这个问题困扰。感谢任何解决此问题的帮助。

谢谢。

我试过用它上传到弹性搜索,效果非常好。

import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config


object MongoUpload {
    val host = <host>
    val user = <user>
    val pwd = <password>
    val database = <db>
    val collection = <collection>
    val uri = "mongodb://${user}:${pwd}@${host}/"
    val NOW = java.time.LocalDate.now.toString

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Mongo-Test-Upload")
      .config("spark.mongodb.output.uri", uri)
      .getOrCreate()

    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .saveToMongoDB(
        WriteConfig(
            Map(
                "uri" -> uri,
                "database" -> database,
                "collection" -> collection
            )
        )
      )
   }


  def toEligibility(row: Row): Eligibility =
    Eligibility(
      row.getAs[String]("DATE_OF_BIRTH"),
      row.getAs[String]("GENDER"),
      row.getAs[String]("INDIVIDUAL_ID"),
      row.getAs[String]("PRODUCT_NAME"),
      row.getAs[String]("STATE_CODE"),
      row.getAs[String]("ZIPCODE"),
      NOW
    )
}

case class Eligibility (
  dateOfBirth: String,
  gender: String,
  recordId: String,
  ProductIdentifier: String,
  stateCode: String,
  zipCode: String,
  updateDate: String
)

Spark 作业失败并出现以下错误,原因是:org.bson.codecs.configuration.CodecConfigurationException:找不到符合 class 资格的编解码器

您可以映射到所需格式的 Document 或转换为 Dataset 然后保存,例如:

    import spark.implicits._
    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .toDS()
      .write()
      .format("com.mongodb.spark.sql.DefaultSource")
      .options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
      .save()
   }