Spark 2.2.x 问题 jdbc 写入 Amazon EMR 5.12.0 中的数据库

Spark 2.2.x issues with jdbc writing to database in Amazon EMR 5.12.0

有一次使用 EMR 5.2.1(我已经坚持使用了一年半),我能够使用以下方法从 Spark 中写入 postgres 数据库:

    try:
        df = self._spark.createDataFrame([Row(id=str(self.uuid),
                                              datetime=datetime.now(),
                                              metadata=json.dumps(self._metadata))])

        df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
        return self
    except AttributeError as e:
        logging.error(e)
    except ReferenceError as e:
        logging.error(e)
    except ValueError as e:
        logging.error(e)
    except Py4JJavaError as e:
        logging.error(e)
    except IllegalArgumentException as e:
        logging.error(e)
    return None

这不适用于 EMR 5.12.0,我无法弄清楚问题出在哪里。我看过 JDBC/PySpark 的摘要,但没有看到任何明显的答案:

这是我提交给 EMR 的配置:

    Configurations=[
        {
            "Classification": "spark",
            "Properties": {
                "maximizeResourceAllocation": "true"
            },
            "Configurations": []},
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.rpc.message.maxSize": "768",
                "spark.driver.maxResultSize": "4500m",
                "spark.jars": "/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.executor.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.driver.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.driver.userClassPathFirst": "true",
                "spark.executor.userClassPathFirst": "true"
            },
            "Configurations": []
        }
    ],

我尝试了这些参数的许多不同组合,忽略了一个或多个,但到目前为止没有任何效果。最后,如果我 运行 在本地,我也无法写入数据库,并且会收到以下错误消息:

2018-03-13 14:58:55,808 root ERROR An error occurred while calling o1319.jdbc.
: scala.MatchError: null
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

在这种情况下,我尝试添加 spark.driver.extraClassPath 配置:

self._spark = SparkSession\
    .builder\
    .appName(self._app_name)\
    .config("spark.driver.extraClassPath", "/path/to/postgresql-9.4-1201.jdbc4.jar")\
    .getOrCreate()

我发现了问题。这是下面一行:

df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})

问题是,如果存在 table,则会产生错误。要解决此问题,请添加 mode='append':

df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"},mode='append')

有几件有趣的事情需要指出。我提交了一个错误,因为指定追加的功能版本会导致相同的错误:

df.write.mode('append').jdbc(url, table, properties={"driver": "org.postgresql.Driver"})

此外,在以前版本的EMR上的Spark中,我没有指定追加模式,但它仍然可以写入数据库。我不知道这种行为何时发生了变化,或者更有可能是在某个时候修复了一个错误。无论如何,这很难追踪,希望我的回答能帮助遇到同样问题的人。