使用 AWS Lambda 从 S3 读取 csv 并插入 MySQL table

Reading csv from S3 and inserting into a MySQL table with AWS Lambda

我正在尝试自动将 csv 加载到 MySQL table 中,当它被接收到 S3 存储桶中时。

我的策略是 S3 在将文件接收到指定的存储桶时启动一个事件(我们称之为 'bucket-file')。这个事件被通知给 AWS Lambda 函数,该函数将下载并处理文件,将每一行插入 MySql table(我们称之为 'target_table')。

我们必须考虑到 RDS 在 VPC 中。

bucket当前权限配置为:

{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "PublicReadForGetBucketObjects",
            "Effect": "Allow",
            "Principal": {
                "AWS": "*"
            },
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::bucket-file/*"
        }
    ]
}

我创建了一个具有以下策略的角色,AmazonS3FullAccess 和 AWSLambdaVPCAccessExecutionRole 附加到 AWS Lambda 函数。

lambda 代码是:

from __future__ import print_function
import boto3
import logging
import os
import sys
import uuid
import pymysql
import csv
import rds_config


rds_host  = rds_config.rds_host
name = rds_config.db_username
password = rds_config.db_password
db_name = rds_config.db_name


logger = logging.getLogger()
logger.setLevel(logging.INFO)

try:
    conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
except Exception as e:
    logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
    logger.error(e)
    sys.exit()

logger.info("SUCCESS: Connection to RDS mysql instance succeeded")

s3_client = boto3.client('s3')

def handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key'] 
    download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)

    s3_client.download_file(bucket, key,download_path)

    csv_data = csv.reader(file( download_path))

    with conn.cursor() as cur:
        for idx, row in enumerate(csv_data):

            logger.info(row)
            try:
                cur.execute('INSERT INTO target_table(column1, column2, column3)' \
                                'VALUES("%s", "%s", "%s")'
                                , row)
            except Exception as e:
                logger.error(e)

            if idx % 100 == 0:
                conn.commit()

        conn.commit()

    return 'File loaded into RDS:' + str(download_path)

我一直在测试该功能,S3 在上传文件时发送事件,Lambda 连接到 RDS 实例并收到通知。我已经检查过存储桶名称是 'bucket-file' 并且文件名也是正确的。问题是当函数到达行 s3_client.download_file(bucket, key,download_path) 时卡住,直到达到 lamdba 过期时间。

看日志说:

[INFO]  2017-01-24T14:36:52.102Z    SUCCESS: Connection to RDS mysql instance succeeded
[INFO]  2017-01-24T14:36:53.282Z    Starting new HTTPS connection (1): bucket-files.s3.amazonaws.com
[INFO]  2017-01-24T14:37:23.223Z    Starting new HTTPS connection (2): bucket-files.s3.amazonaws.com
2017-01-24T14:37:48.684Z Task timed out after 60.00 seconds

我还读到,如果您在 VPC 中工作,为了访问 S3 存储桶,您必须创建一个 VPC 端点,以授予对该子网 S3 的访问权限。我也尝试过这个解决方案,但结果是一样的。

我会很感激一些想法。

提前致谢!

我终于明白了!

问题是 VPC 问题。正如我所说,我创建了一个 VPC 端点以使 S3 服务可以从我的 VPC 访问,但是我的路由 table 配置错误。

因此,总而言之,如果您在使用 lambda 的 VPC 中工作并且想要访问 S3,则需要创建一个 VPC 端点。此外,如果你想访问你的VPC之外的任何其他互联网服务,你需要配置一个NAT网关。