Spark 2.2.x 问题 jdbc 写入 Amazon EMR 5.12.0 中的数据库
Spark 2.2.x issues with jdbc writing to database in Amazon EMR 5.12.0
有一次使用 EMR 5.2.1(我已经坚持使用了一年半),我能够使用以下方法从 Spark 中写入 postgres 数据库:
try:
df = self._spark.createDataFrame([Row(id=str(self.uuid),
datetime=datetime.now(),
metadata=json.dumps(self._metadata))])
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
return self
except AttributeError as e:
logging.error(e)
except ReferenceError as e:
logging.error(e)
except ValueError as e:
logging.error(e)
except Py4JJavaError as e:
logging.error(e)
except IllegalArgumentException as e:
logging.error(e)
return None
这不适用于 EMR 5.12.0,我无法弄清楚问题出在哪里。我看过 JDBC/PySpark 的摘要,但没有看到任何明显的答案:
这是我提交给 EMR 的配置:
Configurations=[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
},
"Configurations": []},
{
"Classification": "spark-defaults",
"Properties": {
"spark.rpc.message.maxSize": "768",
"spark.driver.maxResultSize": "4500m",
"spark.jars": "/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.executor.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.userClassPathFirst": "true",
"spark.executor.userClassPathFirst": "true"
},
"Configurations": []
}
],
我尝试了这些参数的许多不同组合,忽略了一个或多个,但到目前为止没有任何效果。最后,如果我 运行 在本地,我也无法写入数据库,并且会收到以下错误消息:
2018-03-13 14:58:55,808 root ERROR An error occurred while calling o1319.jdbc.
: scala.MatchError: null
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
在这种情况下,我尝试添加 spark.driver.extraClassPath
配置:
self._spark = SparkSession\
.builder\
.appName(self._app_name)\
.config("spark.driver.extraClassPath", "/path/to/postgresql-9.4-1201.jdbc4.jar")\
.getOrCreate()
我发现了问题。这是下面一行:
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
问题是,如果存在 table,则会产生错误。要解决此问题,请添加 mode='append'
:
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"},mode='append')
有几件有趣的事情需要指出。我提交了一个错误,因为指定追加的功能版本会导致相同的错误:
df.write.mode('append').jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
此外,在以前版本的EMR上的Spark中,我没有指定追加模式,但它仍然可以写入数据库。我不知道这种行为何时发生了变化,或者更有可能是在某个时候修复了一个错误。无论如何,这很难追踪,希望我的回答能帮助遇到同样问题的人。
有一次使用 EMR 5.2.1(我已经坚持使用了一年半),我能够使用以下方法从 Spark 中写入 postgres 数据库:
try:
df = self._spark.createDataFrame([Row(id=str(self.uuid),
datetime=datetime.now(),
metadata=json.dumps(self._metadata))])
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
return self
except AttributeError as e:
logging.error(e)
except ReferenceError as e:
logging.error(e)
except ValueError as e:
logging.error(e)
except Py4JJavaError as e:
logging.error(e)
except IllegalArgumentException as e:
logging.error(e)
return None
这不适用于 EMR 5.12.0,我无法弄清楚问题出在哪里。我看过 JDBC/PySpark 的摘要,但没有看到任何明显的答案:
这是我提交给 EMR 的配置:
Configurations=[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
},
"Configurations": []},
{
"Classification": "spark-defaults",
"Properties": {
"spark.rpc.message.maxSize": "768",
"spark.driver.maxResultSize": "4500m",
"spark.jars": "/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.executor.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.userClassPathFirst": "true",
"spark.executor.userClassPathFirst": "true"
},
"Configurations": []
}
],
我尝试了这些参数的许多不同组合,忽略了一个或多个,但到目前为止没有任何效果。最后,如果我 运行 在本地,我也无法写入数据库,并且会收到以下错误消息:
2018-03-13 14:58:55,808 root ERROR An error occurred while calling o1319.jdbc.
: scala.MatchError: null
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
在这种情况下,我尝试添加 spark.driver.extraClassPath
配置:
self._spark = SparkSession\
.builder\
.appName(self._app_name)\
.config("spark.driver.extraClassPath", "/path/to/postgresql-9.4-1201.jdbc4.jar")\
.getOrCreate()
我发现了问题。这是下面一行:
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
问题是,如果存在 table,则会产生错误。要解决此问题,请添加 mode='append'
:
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"},mode='append')
有几件有趣的事情需要指出。我提交了一个错误,因为指定追加的功能版本会导致相同的错误:
df.write.mode('append').jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
此外,在以前版本的EMR上的Spark中,我没有指定追加模式,但它仍然可以写入数据库。我不知道这种行为何时发生了变化,或者更有可能是在某个时候修复了一个错误。无论如何,这很难追踪,希望我的回答能帮助遇到同样问题的人。