从 dataproc 集群将数据加载到 bigquery table 时出错

error while loading data to bigquery table from dataproc cluster

我在 dataproc 中有一个 spark 作业 运行 我想将结果加载到 BigQuery,我知道我必须添加 spark-bigquery 连接器才能将数据保存到 bigquery

  name := "spl_prj"

  version := "0.1"

  scalaVersion := "2.11.12"

  val sparkVersion = "2.3.0"

  conflictManager := ConflictManager.latestRevision

  libraryDependencies ++= Seq(
  "org.apache.spark" %%"spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided ,
  "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.17.3"
  )

当我构建 jar 并提交作业时出现此错误:

  Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.renault.datalake.spl_prj.Main$.main(Main.scala:58)
at com.renault.datalake.spl_prj.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

原因:java.lang.ClassNotFoundException:bigquery.DefaultSource

我在这个提交作业时没有添加jar的权限 我认为当 sbt 构建 jar 时,不会在编译过程中添加连接器,我想要的最短代码 scala spark 运行:

   val spark = SparkSession.builder.config(conf).getOrCreate()
   val bucket = "doc_spk"
   spark.conf.set("temporaryGcsBucket", bucket)
   val sc =spark.sparkContext
   val rddRowString = sc.binaryRecords("gs://bucket/GAR", 120).map(x=>(x.slice(0,17),x.slice(17,20),x.slice(20,120)))
   val df=spark.createDataFrame(rddRowString).toDF("v","data","val_data")
   df.write.format("bigquery")
  .option("table","db.table")
  .save()

使用下面的 buil.sbt 文件构建 fat jar 文件。

build.sbt

name := "spl_prj"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
conflictManager := ConflictManager.latestRevision

libraryDependencies ++= Seq(
  "org.apache.spark" %%"spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided ,
  "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.17.3"
)
assemblyMergeStrategy in assembly := {
  case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
  case PathList("META-INF",xs @ _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

创建 project/plugins.sbt 文件并添加以下内容。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")

运行 下面的命令创建 ```fat`` jar。

sbt clean compile assembly

注意 : 您可以根据您的项目需求调整版本。