自动将数据从 s3 批量加载到 Aurora MySQL RDS 实例
Automate bulk loading of data from s3 to Aurora MySQL RDS instance
我对 AWS 比较陌生,所以我不确定该怎么做,
我在 s3 上有 CSV 文件,并且我已经在 RDS 上设置了 Aurora 实例。我无法弄清楚的是如何自动批量加载数据,本质上是使用 AWS Glue 之类的东西做 LOAD DATA FROM s3
类的事情。
我也使用了 s3 的 Glue 本地东西到 RDS,但它本质上是通过 JDBC 连接插入 RDS 的一堆,这对于大型数据集来说也非常慢。
我可以独立完成 运行 RDS 上的命令,但我不想这样做并想利用 Glue。我还查看了为 Python 使用 MySQL 连接器,但 Glue 本身仅支持 Python 2.7,这是我不想使用的东西。
如有任何帮助,我们将不胜感激。
方法如上所述,有一个 S3 事件触发器和一个在 s3 bucket/object 位置侦听的 lambda 作业。一旦文件上传到 s3 位置,lambda 作业就会 运行,在 lambda 中,您可以配置为调用 AWS Glue 作业。这正是我们所做的,并且已经成功上线。 Lambda 有 15 分钟的生命周期,trigger/start Glue 作业每分钟应该更少。
请在此找到示例来源以供参考。
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
要创建 Lambda 函数,请转到 AWS Lambdra->从头开始创建新函数->Select S3 事件,然后根据需要配置 S3 存储桶位置和前缀。然后复制粘贴上面的代码示例,内联代码区,根据需要配置胶水作业名称。请确保您拥有所有必需的 IAM roles/access 设置。
粘合作业应该有连接到您的 Aurora 的规定,然后您可以使用 Aurora 提供的 "LOAD FROM S3....." 命令。确保所有参数组 settings/configurations 都按需要完成。
如果有任何问题,请告诉我。
更新:从 S3 加载的示例代码片段:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, col4=@var4;"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
我对 AWS 比较陌生,所以我不确定该怎么做,
我在 s3 上有 CSV 文件,并且我已经在 RDS 上设置了 Aurora 实例。我无法弄清楚的是如何自动批量加载数据,本质上是使用 AWS Glue 之类的东西做 LOAD DATA FROM s3
类的事情。
我也使用了 s3 的 Glue 本地东西到 RDS,但它本质上是通过 JDBC 连接插入 RDS 的一堆,这对于大型数据集来说也非常慢。
我可以独立完成 运行 RDS 上的命令,但我不想这样做并想利用 Glue。我还查看了为 Python 使用 MySQL 连接器,但 Glue 本身仅支持 Python 2.7,这是我不想使用的东西。
如有任何帮助,我们将不胜感激。
方法如上所述,有一个 S3 事件触发器和一个在 s3 bucket/object 位置侦听的 lambda 作业。一旦文件上传到 s3 位置,lambda 作业就会 运行,在 lambda 中,您可以配置为调用 AWS Glue 作业。这正是我们所做的,并且已经成功上线。 Lambda 有 15 分钟的生命周期,trigger/start Glue 作业每分钟应该更少。
请在此找到示例来源以供参考。
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
要创建 Lambda 函数,请转到 AWS Lambdra->从头开始创建新函数->Select S3 事件,然后根据需要配置 S3 存储桶位置和前缀。然后复制粘贴上面的代码示例,内联代码区,根据需要配置胶水作业名称。请确保您拥有所有必需的 IAM roles/access 设置。
粘合作业应该有连接到您的 Aurora 的规定,然后您可以使用 Aurora 提供的 "LOAD FROM S3....." 命令。确保所有参数组 settings/configurations 都按需要完成。
如果有任何问题,请告诉我。
更新:从 S3 加载的示例代码片段:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, col4=@var4;"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()