如何将 S3 存储桶内容(.csv 格式)转换为 AWS Lambda 中的数据框
How to convert S3 bucket content(.csv format) into a dataframe in AWS Lambda
我正在尝试通过 lambda 将 S3 数据(csv 文件)摄取到 RDS(MSSQL)。示例代码:
s3 = boto3.client('s3')
if event:
file_obj = event["Records"][0]
bucketname = str(file_obj["s3"]["bucket"]["name"])
csv_filename = unquote_plus(str(file_obj["s3"]["object"]["key"]))
print("Filename: ", csv_filename)
csv_fileObj = s3.get_object(Bucket=bucketname, Key=csv_filename)
file_content = csv_fileObj["Body"].read().decode("utf-8").split()
我试过将我的 csv 内容放入列表但没有成功。
results = []
for row in csv.DictReader(file_content):
results.append(row.values())
print(results)
print(file_content)
return {
'statusCode': 200,
'body': json.dumps('S3 file processed')
}
我是否可以将“file_content
”转换为 Lambda 中的数据框?我有多个列要加载。
后面我会按照这种方式将数据加载到RDS
import pyodbc
import pandas as pd
# insert data from csv file into dataframe(df).
server = 'yourservername'
database = 'AdventureWorks'
username = 'username'
password = 'yourpassword'
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
# Insert Dataframe into SQL Server:
for index, row in df.iterrows():
cursor.execute("INSERT INTO HumanResources.DepartmentTest (DepartmentID,Name,GroupName) values(?,?,?)", row.DepartmentID, row.Name, row.GroupName)
cnxn.commit()
cursor.close()
任何人都可以建议如何去做吗?
您可以使用 io.BytesIO
将字节数据放入内存,然后使用 pandasread_csv
将其转换为数据帧。请注意,数据帧有一些奇怪的 SSL 下载限制,这会导致下载大于 2GB 的数据时出现问题。这就是我在下面的代码中使用这种分块的原因。
import io
obj = s3.get_object(Bucket=bucketname, Key=csv_filename)
# This should prevent the 2GB download limit from a python ssl internal
chunks = (chunk for chunk in obj["Body"].iter_chunks(chunk_size=1024**3))
data = io.BytesIO(b"".join(chunks)) # This keeps everything fully in memory
df = pd.read_csv(data) # here you can provide also some necessary args and kwargs
看来您的目标是将 CSV 文件的内容从 Amazon S3 加载到 SQL 服务器。
您可以在不使用 Dataframes 的情况下执行此操作:
- 循环事件记录(多个可以passed-in)
- 对于每个对象:
- 将对象下载到
/tmp/
- 使用Python
CSVReader
循环遍历文件内容
- 生成
INSERT
语句以将数据插入 SQL 服务器 table
您也可以考虑使用 aws-data-wrangler: Pandas on AWS,它可用作 Lambda 层。
我正在尝试通过 lambda 将 S3 数据(csv 文件)摄取到 RDS(MSSQL)。示例代码:
s3 = boto3.client('s3')
if event:
file_obj = event["Records"][0]
bucketname = str(file_obj["s3"]["bucket"]["name"])
csv_filename = unquote_plus(str(file_obj["s3"]["object"]["key"]))
print("Filename: ", csv_filename)
csv_fileObj = s3.get_object(Bucket=bucketname, Key=csv_filename)
file_content = csv_fileObj["Body"].read().decode("utf-8").split()
我试过将我的 csv 内容放入列表但没有成功。
results = []
for row in csv.DictReader(file_content):
results.append(row.values())
print(results)
print(file_content)
return {
'statusCode': 200,
'body': json.dumps('S3 file processed')
}
我是否可以将“file_content
”转换为 Lambda 中的数据框?我有多个列要加载。
后面我会按照这种方式将数据加载到RDS
import pyodbc
import pandas as pd
# insert data from csv file into dataframe(df).
server = 'yourservername'
database = 'AdventureWorks'
username = 'username'
password = 'yourpassword'
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
# Insert Dataframe into SQL Server:
for index, row in df.iterrows():
cursor.execute("INSERT INTO HumanResources.DepartmentTest (DepartmentID,Name,GroupName) values(?,?,?)", row.DepartmentID, row.Name, row.GroupName)
cnxn.commit()
cursor.close()
任何人都可以建议如何去做吗?
您可以使用 io.BytesIO
将字节数据放入内存,然后使用 pandasread_csv
将其转换为数据帧。请注意,数据帧有一些奇怪的 SSL 下载限制,这会导致下载大于 2GB 的数据时出现问题。这就是我在下面的代码中使用这种分块的原因。
import io
obj = s3.get_object(Bucket=bucketname, Key=csv_filename)
# This should prevent the 2GB download limit from a python ssl internal
chunks = (chunk for chunk in obj["Body"].iter_chunks(chunk_size=1024**3))
data = io.BytesIO(b"".join(chunks)) # This keeps everything fully in memory
df = pd.read_csv(data) # here you can provide also some necessary args and kwargs
看来您的目标是将 CSV 文件的内容从 Amazon S3 加载到 SQL 服务器。
您可以在不使用 Dataframes 的情况下执行此操作:
- 循环事件记录(多个可以passed-in)
- 对于每个对象:
- 将对象下载到
/tmp/
- 使用Python
CSVReader
循环遍历文件内容 - 生成
INSERT
语句以将数据插入 SQL 服务器 table
- 将对象下载到
您也可以考虑使用 aws-data-wrangler: Pandas on AWS,它可用作 Lambda 层。