AWS EMR 气流:Postgresql 连接器
AWS EMR Airflow: Postgresql Connector
我正在通过 Airflow 启动 AWS EMR 作业,它依赖于将数据保存到 PostgreSQL 数据库中。不幸的是,据我所知,连接器在 EMR 中默认不可用,因此出现错误:
Traceback (most recent call last):
File "my_emr_script.py", line 725, in <module>
main()
File "my_emr_script.py", line 718, in main
.mode("overwrite") \
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1493.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$adapted(JDBCOptions.scala:102)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
如何确保 EMR 启动时包含 PostgreSQL 连接器?我通过 bootstrapping 寻找方法,但没有找到答案;所有官方documents refer only the Presto version.
编辑:
我听从了@Emerson 的建议,将 .JAR 下载到 S3 文件夹中,然后直接在 Airflow 中通过配置传递它 JOB_FLOW_OVERRIDES:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_folder }}/scripts/postgresql-42.2.5.jar",
},
}
],
在气流中:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/el_emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_path }}/scripts/postgresql-42.2.5.jar"
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
不幸的是,问题仍然存在。
此外,我尝试修改 bootstrap 以下载 .JAR:
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
并将其传递到配置中:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
],
在气流中:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
这反过来会导致一个新的错误,使 Spark 无法读取 JSON 文件,将它们视为损坏的文件。
root
|-- _corrupt_record: string (nullable = true)
最后,常见的emr_boostrap.sh
:
#!/bin/bash -xe
sudo pip3 install -U \
boto3 \
typing
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
我不确定 emr 是如何配置的,但下面是您将如何配置的。
首先将 postgres jdbc jar 上传到 s3 位置。然后在配置集群时参考它。
如果您通过 Cloudformation 进行配置,那么您需要执行以下操作
EMR:
Type: AWS::EMR::Cluster
Properties:
Applications:
- Name: Spark
Configurations:
- Classification: spark-defaults
ConfigurationProperties:
spark.jars: s3://path_to_jar/postgresql-42.2.11.jar
如果它的 cli 命令,那么它会像下面这样
aws emr create-cluster ...... --configurations config.json
其中 config.json 可能如下所示
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.jars": "s3://path_to_jar/postgresql-42.2.11.jar"
}
}
]
编辑:
看到你编辑的问题后,我可以看到你的 spark 提交参数(SPARKSTEP 变量)。
在该部分中,只需再添加两个项目,如下所示
‘—jars’
‘s3://pathtodriver/postgresdriver.jar’
我正在通过 Airflow 启动 AWS EMR 作业,它依赖于将数据保存到 PostgreSQL 数据库中。不幸的是,据我所知,连接器在 EMR 中默认不可用,因此出现错误:
Traceback (most recent call last):
File "my_emr_script.py", line 725, in <module>
main()
File "my_emr_script.py", line 718, in main
.mode("overwrite") \
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1493.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$adapted(JDBCOptions.scala:102)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
如何确保 EMR 启动时包含 PostgreSQL 连接器?我通过 bootstrapping 寻找方法,但没有找到答案;所有官方documents refer only the Presto version.
编辑:
我听从了@Emerson 的建议,将 .JAR 下载到 S3 文件夹中,然后直接在 Airflow 中通过配置传递它 JOB_FLOW_OVERRIDES:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_folder }}/scripts/postgresql-42.2.5.jar",
},
}
],
在气流中:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/el_emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_path }}/scripts/postgresql-42.2.5.jar"
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
不幸的是,问题仍然存在。
此外,我尝试修改 bootstrap 以下载 .JAR:
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
并将其传递到配置中:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
],
在气流中:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
这反过来会导致一个新的错误,使 Spark 无法读取 JSON 文件,将它们视为损坏的文件。
root
|-- _corrupt_record: string (nullable = true)
最后,常见的emr_boostrap.sh
:
#!/bin/bash -xe
sudo pip3 install -U \
boto3 \
typing
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
我不确定 emr 是如何配置的,但下面是您将如何配置的。
首先将 postgres jdbc jar 上传到 s3 位置。然后在配置集群时参考它。
如果您通过 Cloudformation 进行配置,那么您需要执行以下操作
EMR:
Type: AWS::EMR::Cluster
Properties:
Applications:
- Name: Spark
Configurations:
- Classification: spark-defaults
ConfigurationProperties:
spark.jars: s3://path_to_jar/postgresql-42.2.11.jar
如果它的 cli 命令,那么它会像下面这样
aws emr create-cluster ...... --configurations config.json
其中 config.json 可能如下所示
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.jars": "s3://path_to_jar/postgresql-42.2.11.jar"
}
}
]
编辑:
看到你编辑的问题后,我可以看到你的 spark 提交参数(SPARKSTEP 变量)。 在该部分中,只需再添加两个项目,如下所示
‘—jars’
‘s3://pathtodriver/postgresdriver.jar’