如何使用 AWS Glue 运行 任意/DDL SQL 语句或存储过程
How to run arbitrary / DDL SQL statements or stored procedures using AWS Glue
是否可以从 AWS Glue python 作业中执行任意 SQL 命令,例如 ALTER TABLE?我知道我可以使用它从表中读取数据,但是有没有办法执行其他数据库特定命令?
我需要将数据提取到目标数据库中,然后 运行 一些 ALTER 命令。
视情况而定。如果您使用 redshift 作为目标,您可以选择指定 pre 和 post 操作作为连接选项的一部分。您可以在那里指定更改操作。然而,对于其余的目标类型,您可能需要使用一些 python 模块,如 pg8000(在 Postgres 的情况下)和其他
因此,在进行广泛研究并向 AWS 支持开立案例后,他们告诉我目前无法通过 Python shell 或 Glue pyspark 作业。但我只是尝试了一些有创意的东西而且它奏效了!这个想法是使用 py4j sparks 已经依赖并利用标准 java sql 包。
这种方法的两大好处:
这样做的一个巨大好处是,您可以将数据库连接定义为 Glue 数据连接,并在其中保留 jdbc 详细信息和凭据,而无需在 Glue 代码中对它们进行硬编码。我下面的示例通过调用 glueContext.extract_jdbc_conf('your_glue_data_connection_name')
来获取 jdbc url 和凭据,在 Glue 中定义。
如果您需要 运行 SQL 在受支持的开箱即用 Glue 数据库上执行命令,您甚至不需要 use/pass jdbc 该数据库的驱动程序 - 只需确保为该数据库设置 Glue 连接并将该连接添加到你的 Glue 作业 - Glue 将上传正确的数据库驱动程序 jars。
请记住下面这段代码是由驱动程序进程执行的,不能由 Spark workers/executors 执行。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();
conn.close()
我修改了 mishkin 共享的代码,但它对我不起作用。因此,在进行了一些故障排除后,我意识到目录中的连接不起作用。所以我不得不手动修改它并稍微调整代码。现在,它的工作正常但最终会出现异常,因为它无法将 java 结果转换为 python 结果。我做了一个变通,所以谨慎使用。
below is my code.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
print('Trying to connect to DB')
conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword')
print('Trying to connect to DB success!')
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
stmt = conn.createStatement();
#cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");
print('Call to proc trying ')
#cstmt.setString("job_name", "testjob");
try:
rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
except:
print("An exception occurred but proc has run")
#results = cstmt.execute();`enter code here`
conn.close()
我终于在几个小时后完成了这项工作,希望以下内容对您有所帮助。我的脚本深受早期回复的影响,谢谢。
先决条件:
- 在尝试任何脚本之前,您需要配置和测试 Glue 连接。
- 设置您的 AWS Glue 作业时,请使用 Spark、Glue 2.0 或更高版本以及Python 版本 3。
- 我建议只为 2 个工作线程配置此作业以节省成本;大部分工作将由数据库完成,而不是胶水。
- 以下是使用 AWS RDS PostgreSQL 实例测试的,但希望它足够灵活以适用于其他数据库。
- 脚本需要在脚本顶部附近更新 3 个参数(glue_connection_name、database_name 和 stored_proc)。
- JOB_NAME、连接字符串和凭据由脚本检索,无需提供。
- 如果您的存储过程将 return 数据集,则将 executeUpdate 替换为 executeQuery。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
logger = glueContext.get_logger()
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
logger.info("Finished")
如果将连接对象附加到粘合作业,则可以轻松获取连接设置:
glue_client = boto3.client('glue')
getjob=glue_client.get_job(JobName=args["JOB_NAME"])
connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0])
conn_name = connection_settings['Connection']['Name']
df = glueContext.extract_jdbc_conf(conn_name)
是否可以从 AWS Glue python 作业中执行任意 SQL 命令,例如 ALTER TABLE?我知道我可以使用它从表中读取数据,但是有没有办法执行其他数据库特定命令?
我需要将数据提取到目标数据库中,然后 运行 一些 ALTER 命令。
视情况而定。如果您使用 redshift 作为目标,您可以选择指定 pre 和 post 操作作为连接选项的一部分。您可以在那里指定更改操作。然而,对于其余的目标类型,您可能需要使用一些 python 模块,如 pg8000(在 Postgres 的情况下)和其他
因此,在进行广泛研究并向 AWS 支持开立案例后,他们告诉我目前无法通过 Python shell 或 Glue pyspark 作业。但我只是尝试了一些有创意的东西而且它奏效了!这个想法是使用 py4j sparks 已经依赖并利用标准 java sql 包。
这种方法的两大好处:
这样做的一个巨大好处是,您可以将数据库连接定义为 Glue 数据连接,并在其中保留 jdbc 详细信息和凭据,而无需在 Glue 代码中对它们进行硬编码。我下面的示例通过调用
glueContext.extract_jdbc_conf('your_glue_data_connection_name')
来获取 jdbc url 和凭据,在 Glue 中定义。如果您需要 运行 SQL 在受支持的开箱即用 Glue 数据库上执行命令,您甚至不需要 use/pass jdbc 该数据库的驱动程序 - 只需确保为该数据库设置 Glue 连接并将该连接添加到你的 Glue 作业 - Glue 将上传正确的数据库驱动程序 jars。
请记住下面这段代码是由驱动程序进程执行的,不能由 Spark workers/executors 执行。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();
conn.close()
我修改了 mishkin 共享的代码,但它对我不起作用。因此,在进行了一些故障排除后,我意识到目录中的连接不起作用。所以我不得不手动修改它并稍微调整代码。现在,它的工作正常但最终会出现异常,因为它无法将 java 结果转换为 python 结果。我做了一个变通,所以谨慎使用。
below is my code.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
print('Trying to connect to DB')
conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword')
print('Trying to connect to DB success!')
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
stmt = conn.createStatement();
#cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");
print('Call to proc trying ')
#cstmt.setString("job_name", "testjob");
try:
rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
except:
print("An exception occurred but proc has run")
#results = cstmt.execute();`enter code here`
conn.close()
我终于在几个小时后完成了这项工作,希望以下内容对您有所帮助。我的脚本深受早期回复的影响,谢谢。
先决条件:
- 在尝试任何脚本之前,您需要配置和测试 Glue 连接。
- 设置您的 AWS Glue 作业时,请使用 Spark、Glue 2.0 或更高版本以及Python 版本 3。
- 我建议只为 2 个工作线程配置此作业以节省成本;大部分工作将由数据库完成,而不是胶水。
- 以下是使用 AWS RDS PostgreSQL 实例测试的,但希望它足够灵活以适用于其他数据库。
- 脚本需要在脚本顶部附近更新 3 个参数(glue_connection_name、database_name 和 stored_proc)。
- JOB_NAME、连接字符串和凭据由脚本检索,无需提供。
- 如果您的存储过程将 return 数据集,则将 executeUpdate 替换为 executeQuery。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
logger = glueContext.get_logger()
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
logger.info("Finished")
如果将连接对象附加到粘合作业,则可以轻松获取连接设置:
glue_client = boto3.client('glue')
getjob=glue_client.get_job(JobName=args["JOB_NAME"])
connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0])
conn_name = connection_settings['Connection']['Name']
df = glueContext.extract_jdbc_conf(conn_name)