Scala Upserts 使用 jdbc preparedstatement 和 foreachPartition

Scala Upserts using jdbc preparedstatement and foreachPartition

我是 scala/java 编程新手

我需要创建一个 scala jdbcTemplate 来将多个列映射到 PostgreSQL 数据库的 SQL 查询。

我的插入查询有大约 80 列。

像这样:

INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd) 
DO  UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?

需要放在“?”中的数据占位符存储在另一个名为 inputdatafiledfwindow 的数据框中。

列的映射,即prepared statement中设置值的函数是动态生成的。

          val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
                st.setShort(1, row.getShort(0))
                st.setInt(2, row.getInt(1))
                st.setString(3, row.getString(2).replaceAll("\000", ""))
                st.setString(4, row.getString(3).replaceAll("\000", ""))
                st.setString(5, row.getString(4).replaceAll("\000", ""))
                st.setString(6, row.getString(5).replaceAll("\000", ""))
                st.setDate(7, row.getDate(6))
                st.setDate(8, row.getDate(7))
                st.setString(9, row.getString(8).replaceAll("\000", ""))
                st.setString(10, row.getString(9).replaceAll("\000", ""))
                st.setString(11, row.getString(10).replaceAll("\000", ""))
                st.setString(12, row.getString(11).replaceAll("\000", ""))
                st.setString(13, row.getString(12).replaceAll("\000", ""))
                st.setString(14, row.getString(13).replaceAll("\000", ""))
                st.setString(15, row.getString(14).replaceAll("\000", ""))
                st.setString(16, row.getString(15).replaceAll("\000", ""))
                st.setString(17, row.getString(16).replaceAll("\000", ""))
                st.setString(18, row.getString(17).replaceAll("\000", ""))
                st.setString(19, row.getString(18).replaceAll("\000", ""))
                st.setString(20, row.getString(19).replaceAll("\000", ""))
                st.setString(21, row.getString(20).replaceAll("\000", ""))
                st.setString(22, row.getString(21).replaceAll("\000", ""))
                st.setString(23, row.getString(22).replaceAll("\000", ""))
                st.setString(24, row.getString(23).replaceAll("\000", ""))
                st.setString(25, row.getString(24).replaceAll("\000", ""))
                st.setString(26, row.getString(25).replaceAll("\000", ""))
          }

          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

目前我正在尝试这样的事情:

val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)

inputdatafiledfwindow.coalesce(10).foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
      val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames.length-1) {
            if (columnDataTypes(i) == "ShortType")
              st.setShort((i+1).toInt, row.getShort(i))
            else if(columnDataTypes(i)== "IntegerType")
              st.setInt((i+1).toInt,row.getInt(i))
            else if (columnDataTypes(i)=="StringType")
              st.setString((i+1).toInt,row.getString(i))
            else if(columnDataTypes(i)=="TimestampType")
              st.setTimestamp((i+1).toInt, row.getTimestamp(i))
            else if(columnDataTypes(i)=="DateType")
              st.setDate((i+1).toInt,row.getDate(i))
            else if (columnDataTypes(i)=="DoubleType")
              st.setDouble((i+1).toInt, row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

这给我:org.apache.spark.SparkException:任务不可序列化错误

任何我可以参考的想法或资源来实现这个。 我知道这在 java 中是可能的,但我在 java 和 Scala 中都没有工作太多。

编辑:尝试在 foreachPartition 中使用 braodcast 变量 仍在获得 org.apache.spark.SparkException: Task not serializable

下面是完整的异常堆栈:


org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:957)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:956)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply$mcV$sp(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId.apply(Dataset.scala:3349)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
  ... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
        - object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
        - field (class: $iw, name: dbc, type: interface java.sql.Connection)
        - object (class $iw, $iw@22459ca5)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@788dd40c)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@31c725ed)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a367987)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@7cffd7ab)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3c615880)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@289fa6c2)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2a5a0934)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a04a12a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@c5fe90a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@58b67f02)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@243a4a22)
        - field (class: $line77.$read, name: $iw, type: class $iw)
        - object (class $line77.$read, $line77.$read@5f473976)
        - field (class: $iw, name: $line77$read, type: class $line77.$read)
        - object (class $iw, $iw@70fc6803)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@26818b0)
        - field (class: $anonfun, name: $outer, type: class $iw)
        - object (class $anonfun, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 98 more

更新 2:按照@RamGhadiyaram 的建议进行了修改,但现在面临新的 NullPointerException。我不明白我哪里错了。 当然,这一定是一个简单的解决方案。

下面是更新的代码:

val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

    val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"

    val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)


    inputdatafiledfwindow.foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(columnNames)
      val columnDataTypes_br = sc.broadcast(columnDataTypes)

      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames_br.value.length-1) {
            if (columnDataTypes_br.value(i) == "ShortType")
              st.setShort((i+1), row.getShort(i))
            else if(columnDataTypes_br.value(i)== "IntegerType")
              st.setInt((i+1),row.getInt(i))
            else if (columnDataTypes_br.value(i)=="StringType")
              st.setString((i+1),row.getString(i))
            else if(columnDataTypes_br.value(i)=="TimestampType")
              st.setTimestamp((i+1), row.getTimestamp(i))
            else if(columnDataTypes_br.value(i)=="DateType")
              st.setDate((i+1),row.getDate(i))
            else if (columnDataTypes_br.value(i)=="DoubleType")
              st.setDouble((i+1), row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

下面是新的异常堆栈:

20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:87)
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
        at $anonfun.apply(<console>:87)
        at $anonfun.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
  at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply$mcV$sp(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
  ... 86 elided
Caused by: java.lang.NullPointerException
  at $anonfun.apply(<console>:87)
  at $anonfun.apply(<console>:86)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:926)
  at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
  at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

据我所知,您需要动态生成所有列并插入到具有正确数据类型的相应数据列中。

对于现有数据框

    val columnNames : Array[String] = inputdatafiledfwindow.columns

    val columnDataTypes : Array[String] = inputdatafiledfwindow.schema.fields
                            .map(x=>x.dataType)
                            .map(x=>x.toString)

现在您获得了列和相应的数据类型。

您通过动态检查数据类型在循环中实现,并为准备语句调用适当的 psmt.setxxx 方法。数组索引是 setXXX 的参数索引。

在这种情况下,不需要 spring jdbc 模板,同样可以使用 jdbc 实现。

更新1:


Your column types i.e columnDataTypes and columns names array i.e. columnNames should be broadcasted using broadcast variable to use them in side the foreachpartition that might be root cause of the error .. org.apache.spark.SparkException: Task not serializable error


更新 2: 原因:java.io.NotSerializableException:org.postgresql.jdbc.PgConnection 有些地方你的连接有这个问题

AFAIK,您正在转换为可能来自不同对象的字符串的用户和密码...但通常它看起来与您的代码没问题。重新检查一下。

根据 Spark 文档,您也可以像这样声明 url 以及用户和密码。试试吧。

JDBC To Other Databases - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret

更新 3:空指针异常的原因很简单。 如果你在 null 上操作那么它会导致空指针异常

例如: row.getString(8).replaceAll("\000", "") 如果 row.getString(8) 为空并且您正在申请 replaceAll 那么 这是空指针异常。你必须检查 row.getString(8) 是否不为 null 然后你必须应用函数 replaceAll..

避免空指针的最佳方法是使用 scala 选项。

另一个观察结果是每个循环都使用 scala 而不是 java 传统循环。

注意:请针对每项要求提出单独的问题。不要逐渐混合。它会混淆其他人。