由于 CodecConfigurationException,将 CSV 文件数据上传到 mongo 数据库的 Scala Spark 作业失败
Scala Spark job to upload CSV file data to mongo DB fails due to CodecConfigurationException
我是 spark 和 scala 的新手。我正在尝试使用 Scala 中的 spark 作业将 csv 文件上传到 Mongo 数据库。
上传时,在作业执行期间遇到以下错误,
org.bson.codecs.configuration.CodecConfigurationException:找不到 class 的编解码器。
输入文件的路径将在执行过程中传递。
过去 2 天我一直被这个问题困扰。感谢任何解决此问题的帮助。
谢谢。
我试过用它上传到弹性搜索,效果非常好。
import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config
object MongoUpload {
val host = <host>
val user = <user>
val pwd = <password>
val database = <db>
val collection = <collection>
val uri = "mongodb://${user}:${pwd}@${host}/"
val NOW = java.time.LocalDate.now.toString
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Mongo-Test-Upload")
.config("spark.mongodb.output.uri", uri)
.getOrCreate()
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.saveToMongoDB(
WriteConfig(
Map(
"uri" -> uri,
"database" -> database,
"collection" -> collection
)
)
)
}
def toEligibility(row: Row): Eligibility =
Eligibility(
row.getAs[String]("DATE_OF_BIRTH"),
row.getAs[String]("GENDER"),
row.getAs[String]("INDIVIDUAL_ID"),
row.getAs[String]("PRODUCT_NAME"),
row.getAs[String]("STATE_CODE"),
row.getAs[String]("ZIPCODE"),
NOW
)
}
case class Eligibility (
dateOfBirth: String,
gender: String,
recordId: String,
ProductIdentifier: String,
stateCode: String,
zipCode: String,
updateDate: String
)
Spark 作业失败并出现以下错误,原因是:org.bson.codecs.configuration.CodecConfigurationException:找不到符合 class 资格的编解码器
您可以映射到所需格式的 Document
或转换为 Dataset
然后保存,例如:
import spark.implicits._
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.toDS()
.write()
.format("com.mongodb.spark.sql.DefaultSource")
.options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
.save()
}
我是 spark 和 scala 的新手。我正在尝试使用 Scala 中的 spark 作业将 csv 文件上传到 Mongo 数据库。
上传时,在作业执行期间遇到以下错误,
org.bson.codecs.configuration.CodecConfigurationException:找不到 class 的编解码器。
输入文件的路径将在执行过程中传递。
过去 2 天我一直被这个问题困扰。感谢任何解决此问题的帮助。
谢谢。
我试过用它上传到弹性搜索,效果非常好。
import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config
object MongoUpload {
val host = <host>
val user = <user>
val pwd = <password>
val database = <db>
val collection = <collection>
val uri = "mongodb://${user}:${pwd}@${host}/"
val NOW = java.time.LocalDate.now.toString
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Mongo-Test-Upload")
.config("spark.mongodb.output.uri", uri)
.getOrCreate()
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.saveToMongoDB(
WriteConfig(
Map(
"uri" -> uri,
"database" -> database,
"collection" -> collection
)
)
)
}
def toEligibility(row: Row): Eligibility =
Eligibility(
row.getAs[String]("DATE_OF_BIRTH"),
row.getAs[String]("GENDER"),
row.getAs[String]("INDIVIDUAL_ID"),
row.getAs[String]("PRODUCT_NAME"),
row.getAs[String]("STATE_CODE"),
row.getAs[String]("ZIPCODE"),
NOW
)
}
case class Eligibility (
dateOfBirth: String,
gender: String,
recordId: String,
ProductIdentifier: String,
stateCode: String,
zipCode: String,
updateDate: String
)
Spark 作业失败并出现以下错误,原因是:org.bson.codecs.configuration.CodecConfigurationException:找不到符合 class 资格的编解码器
您可以映射到所需格式的 Document
或转换为 Dataset
然后保存,例如:
import spark.implicits._
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.toDS()
.write()
.format("com.mongodb.spark.sql.DefaultSource")
.options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
.save()
}