在 Spark 上更新 Oracle table 时如何避免 ORA-00060(检测到死锁)错误
How to avoid ORA-00060 (deadlock detected) error when updating Oracle table on Spark
我的 spark 作业中有一个奇怪的错误,如果可能的话我会给出一些解释。
因此,我的 Spark 作业从 Hive table 加载数据,将其转换为 Dataframe,然后根据某些列更新现有的 Oracle table。
当数据帧不是很大时,作业运行没有问题。
当数据帧非常大时,作业会运行几个小时然后停止并出现 Oracle 错误:
exception caught: org.apache.spark.SparkException: Job aborted due to stage failure: Task 104 in stage 43.0 failed 4 times, most recent failure: Lost task 104.3 in stage 43.0 (TID 5937, lxpbda55.ra1.intra.groupama.fr, executor 227): java.sql.BatchUpdateException: ORA-00060: deadlock detected while waiting for resource
我的代码是这样工作的:
//This is where the error appears
modification(df_Delta_Modif, champs, conditions, cstProp)
//This is its definition
def modification(df: DataFrame, champs: List[String], conditions: List[String], cstProp: java.util.Properties) {
val url = Parametre_mod.oracleUrl
val options: JDBCOptions = new JDBCOptions(Map("url" -> url, "dbtable" -> Parametre_mod.targetTableBase, "user" -> Parametre_mod.oracleUser,
"password" -> Parametre_mod.oraclePassword, "driver" -> "oracle.jdbc.driver.OracleDriver", "batchSize" -> "30000"))
Crud_mod.modifierbatch(df, options, champs, conditions)
}
//This is the definition of modifierbatch. It starts with establishing a connection to Oracle.
//Which surely works because I use the same thing on other scripts and it works fine
def modifierbatch(df: DataFrame,
options : JDBCOptions,
champs: List[String],
conditions: List[String]) {
val url = options.url
val tables = options.table
val dialect = JdbcDialects_mod.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val chainestmt = creerOdreSQLmodificationSimple(champs, conditions, tables) //definition below
val listChamps: List[Int] = champs.map(rddSchema.fieldIndex):::conditions.map(rddSchema.fieldIndex)
df.foreachPartition { iterator =>
//savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
executePartition(getConnection, tables, iterator, rddSchema, nullTypes, batchSize, chainestmt, listChamps, dialect, 0, "")
}
}
//This is the definition of creerOdreSQLmodificationSimple
def creerOdreSQLmodificationSimple(listChamps: List[String], listCondition: List[String], tablecible: String): String = {
val champs = listChamps.map(_.toUpperCase).mkString(" = ?, ")
val condition = listCondition.map(_.toUpperCase).mkString(" = ? and ")
s"""UPDATE ${tablecible} SET ${champs} = ? WHERE ${condition} = ?"""
}
所以如你所见,原理并不是很复杂。我只是使用批处理执行一个 Oracle 函数(更新)。我不知道是什么导致了死锁问题。我没有在 Spark 中使用任何重新分区。
如果您需要更多详细信息,请告诉我。谢谢
通过使用 df.foreachPartition
,看起来数据库访问是在多个并行连接上完成的。
如果是这样,则每个分区中必须存在更新相同行的条件。
您的选择是:
- 消除重叠,这样就不会有两次更新更新同一行。
- 如果您做不到,请安排所有影响给定行的更新保证在同一行中 "partition"。
- 如果您做不到,请在处理条件值之前对其进行排序。例如,如果您的条件类似于
column1 = ? and column2 = ?
并且您的值设置为 { (1, 'R'), (5, 'Q'), (1,'B'), (2 , 'Z') }, 然后对它们进行排序 (1,'B')->(1,'R')->(2,'Z')->(5,'Q').实际上,只要排序顺序明确(没有关系)并且所有分区都以相同的方式对它们的条件进行排序,您如何对它们进行排序并不重要。
- 不要使用
foreachPartition
(即不要尝试并行使用 运行)。实际上这只是上面#2 的变体。
按照选项 3 对工作进行排序将避免死锁,但您将失去 运行 并行处理的大部分好处(因为某些分区会阻塞其他分区)。
我的 spark 作业中有一个奇怪的错误,如果可能的话我会给出一些解释。
因此,我的 Spark 作业从 Hive table 加载数据,将其转换为 Dataframe,然后根据某些列更新现有的 Oracle table。
当数据帧不是很大时,作业运行没有问题。 当数据帧非常大时,作业会运行几个小时然后停止并出现 Oracle 错误:
exception caught: org.apache.spark.SparkException: Job aborted due to stage failure: Task 104 in stage 43.0 failed 4 times, most recent failure: Lost task 104.3 in stage 43.0 (TID 5937, lxpbda55.ra1.intra.groupama.fr, executor 227): java.sql.BatchUpdateException: ORA-00060: deadlock detected while waiting for resource
我的代码是这样工作的:
//This is where the error appears
modification(df_Delta_Modif, champs, conditions, cstProp)
//This is its definition
def modification(df: DataFrame, champs: List[String], conditions: List[String], cstProp: java.util.Properties) {
val url = Parametre_mod.oracleUrl
val options: JDBCOptions = new JDBCOptions(Map("url" -> url, "dbtable" -> Parametre_mod.targetTableBase, "user" -> Parametre_mod.oracleUser,
"password" -> Parametre_mod.oraclePassword, "driver" -> "oracle.jdbc.driver.OracleDriver", "batchSize" -> "30000"))
Crud_mod.modifierbatch(df, options, champs, conditions)
}
//This is the definition of modifierbatch. It starts with establishing a connection to Oracle.
//Which surely works because I use the same thing on other scripts and it works fine
def modifierbatch(df: DataFrame,
options : JDBCOptions,
champs: List[String],
conditions: List[String]) {
val url = options.url
val tables = options.table
val dialect = JdbcDialects_mod.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val chainestmt = creerOdreSQLmodificationSimple(champs, conditions, tables) //definition below
val listChamps: List[Int] = champs.map(rddSchema.fieldIndex):::conditions.map(rddSchema.fieldIndex)
df.foreachPartition { iterator =>
//savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
executePartition(getConnection, tables, iterator, rddSchema, nullTypes, batchSize, chainestmt, listChamps, dialect, 0, "")
}
}
//This is the definition of creerOdreSQLmodificationSimple
def creerOdreSQLmodificationSimple(listChamps: List[String], listCondition: List[String], tablecible: String): String = {
val champs = listChamps.map(_.toUpperCase).mkString(" = ?, ")
val condition = listCondition.map(_.toUpperCase).mkString(" = ? and ")
s"""UPDATE ${tablecible} SET ${champs} = ? WHERE ${condition} = ?"""
}
所以如你所见,原理并不是很复杂。我只是使用批处理执行一个 Oracle 函数(更新)。我不知道是什么导致了死锁问题。我没有在 Spark 中使用任何重新分区。
如果您需要更多详细信息,请告诉我。谢谢
通过使用 df.foreachPartition
,看起来数据库访问是在多个并行连接上完成的。
如果是这样,则每个分区中必须存在更新相同行的条件。
您的选择是:
- 消除重叠,这样就不会有两次更新更新同一行。
- 如果您做不到,请安排所有影响给定行的更新保证在同一行中 "partition"。
- 如果您做不到,请在处理条件值之前对其进行排序。例如,如果您的条件类似于
column1 = ? and column2 = ?
并且您的值设置为 { (1, 'R'), (5, 'Q'), (1,'B'), (2 , 'Z') }, 然后对它们进行排序 (1,'B')->(1,'R')->(2,'Z')->(5,'Q').实际上,只要排序顺序明确(没有关系)并且所有分区都以相同的方式对它们的条件进行排序,您如何对它们进行排序并不重要。 - 不要使用
foreachPartition
(即不要尝试并行使用 运行)。实际上这只是上面#2 的变体。
按照选项 3 对工作进行排序将避免死锁,但您将失去 运行 并行处理的大部分好处(因为某些分区会阻塞其他分区)。