AWS Glue - 在插入之前截断目标 postgres table
AWS Glue - Truncate destination postgres table prior to insert
我正在尝试在插入之前截断一个 postgres 目标 table,并且一般来说,试图利用已经在 GLUE 中创建的连接来触发外部函数。
有没有人做到过?
我已经尝试了 DROP/ TRUNCATE
场景,但是无法使用已经在 Glue 中创建的连接来完成,但是使用纯 Python PostgreSQL 驱动程序,pg8000。
- 下载pg8000 from pypitar
- 在根文件夹中创建一个空
__init__.py
- 压缩内容并上传到 S3
- 在作业
Python lib path
中引用 zip 文件
- 将数据库连接详细信息设置为作业参数(确保在所有键名前加上
--
)。勾选 "Server-side encryption" 方框。
然后你可以简单地创建一个连接并执行SQL。
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pg8000
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'PW',
'HOST',
'USER',
'DB'
])
# ...
# Create Spark & Glue context
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ...
config_port = 5432
conn = pg8000.connect(
database=args['DB'],
user=args['USER'],
password=args['PW'],
host=args['HOST'],
port=config_port
)
query = "TRUNCATE TABLE {0};".format(".".join([schema, table]))
cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()
conn.close()
在执行@thenaturalist 回复的步骤 (4) 后,
sc.addPyFile("/home/glue/downloads/python/pg8000.zip")
import pg8000
在开发端点(zeppelin 笔记本)为我工作
更多信息:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html
澄清@thenaturalist 的 zip 说明,因为我仍在为此苦苦挣扎
下载 pg8000 from pypi.org 的 tar.gz 并解压。
压缩内容,使您具有以下结构
pg8000-1.15.3.zip
|
| -- pg8000 <dir>
| -- __init__.py
| -- _version.py <optional>
| -- core.py
上传到 s3 然后你应该可以做一个简单的 import pg8000
.
NOTE: scramp is also required at the moment so follow the same procedure as above to include the scramp module. But you don't need to import it.
我正在尝试在插入之前截断一个 postgres 目标 table,并且一般来说,试图利用已经在 GLUE 中创建的连接来触发外部函数。
有没有人做到过?
我已经尝试了 DROP/ TRUNCATE
场景,但是无法使用已经在 Glue 中创建的连接来完成,但是使用纯 Python PostgreSQL 驱动程序,pg8000。
- 下载pg8000 from pypitar
- 在根文件夹中创建一个空
__init__.py
- 压缩内容并上传到 S3
- 在作业
Python lib path
中引用 zip 文件 - 将数据库连接详细信息设置为作业参数(确保在所有键名前加上
--
)。勾选 "Server-side encryption" 方框。
然后你可以简单地创建一个连接并执行SQL。
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pg8000
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'PW',
'HOST',
'USER',
'DB'
])
# ...
# Create Spark & Glue context
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ...
config_port = 5432
conn = pg8000.connect(
database=args['DB'],
user=args['USER'],
password=args['PW'],
host=args['HOST'],
port=config_port
)
query = "TRUNCATE TABLE {0};".format(".".join([schema, table]))
cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()
conn.close()
在执行@thenaturalist 回复的步骤 (4) 后,
sc.addPyFile("/home/glue/downloads/python/pg8000.zip")
import pg8000
在开发端点(zeppelin 笔记本)为我工作
更多信息:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html
澄清@thenaturalist 的 zip 说明,因为我仍在为此苦苦挣扎
下载 pg8000 from pypi.org 的 tar.gz 并解压。
压缩内容,使您具有以下结构
pg8000-1.15.3.zip
|
| -- pg8000 <dir>
| -- __init__.py
| -- _version.py <optional>
| -- core.py
上传到 s3 然后你应该可以做一个简单的 import pg8000
.
NOTE: scramp is also required at the moment so follow the same procedure as above to include the scramp module. But you don't need to import it.