s3 上的代码优化读取 csv 并摄取回 s3 存储桶
Code Optimization on s3 read csv and ingest back to s3 bucket
ddict = defaultdict(set)
file_str = query_csv_s3(s3, BUCKET_NAME, filename, sql_exp, use_header)
# read CSV to dataframe
df = pd.read_csv(StringIO(file_str))
fdf = df.drop_duplicates(subset='cleverTapId', keep='first')
fdf.dropna(inplace=True)
col_one_list = fdf['identity'].tolist()
col_two_list = fdf['cleverTapId'].tolist()
for k, v in zip(col_one_list, col_two_list):
ddict[k].add(v)
for imkey in ddict:
im_length = len(str(imkey))
if im_length == 9:
if len(ddict[imkey]) == 1:
for value in ddict[imkey]:
tdict = {imkey:value}
write_to_csv(FILE_NAME,tdict)
else:
ctlist = list(ddict[imkey])
snp_dict = {imkey:'|'.join(ctlist)}
write_to_csv(SNAP_FILE_NAME, snp_dict)
elif im_length > 0:
if len(ddict[imkey]) == 1:
for value in ddict[imkey]:
fdict = {imkey:value}
write_to_csv(FRAUD_FILE_NAME,fdict)
else:
pass
# mult_ct = list(ddict[imkey])
# mydict = {imkey:','.join(mult_ct)}
# write_to_csv(MY_FILENAME,mydict)
else:
pass
这里是write_to_csv
:
def write_to_csv(filename,mdict):
file_exists = os.path.isfile(filename)
with open(filename,'a',newline='') as csvfile:
headers = ['IM No', 'CT ID']
writer = csv.DictWriter(
csvfile,
delimiter=',',
lineterminator='\n',
fieldnames=headers
)
if not file_exists:
writer.writeheader()
for key in mdict:
writer.writerow({'IM No': key, 'CT ID': mdict[key]})
我正在使用 s3 select.
读取包含 2 列的 csv 文件
我正在生成 1 IM :1 CTID,一对多和多对多文件并将其上传回 s3 存储桶
处理从 s3 读取并上传回的 530 MB 文件大小需要 18 小时,我该如何进一步优化它?
这基本上是一个猜测,因为我不能 运行 你的代码。您将数据写入 CSV 文件的方式极其低效。
I/O 对 SSD 或磁盘的操作是 IT 中成本较高的操作之一。现在,您为要追加的每一行打开一个文件,然后追加它并再次关闭该文件。
这意味着对于一个 530 MB 的文件,您可能要执行数百万次这些昂贵的操作。
如果您查看任务管理器中的性能选项卡,您可能会看到磁盘使用率非常高。
在内存中缓冲其中的一些(如果 RAM 足够大,则缓冲全部)并在最后将它们刷新到磁盘会更有效。
大致是这样的:
FRAUD_FILE_CONTENTS = []
# Computation stuff
FRAU_FILE_CONTENTS.append({"my": "dict"})
# After the loop
with open(FRAUD_FILE_NAME, "w"):
# Write to CSV
ddict = defaultdict(set)
file_str = query_csv_s3(s3, BUCKET_NAME, filename, sql_exp, use_header)
# read CSV to dataframe
df = pd.read_csv(StringIO(file_str))
fdf = df.drop_duplicates(subset='cleverTapId', keep='first')
fdf.dropna(inplace=True)
col_one_list = fdf['identity'].tolist()
col_two_list = fdf['cleverTapId'].tolist()
for k, v in zip(col_one_list, col_two_list):
ddict[k].add(v)
for imkey in ddict:
im_length = len(str(imkey))
if im_length == 9:
if len(ddict[imkey]) == 1:
for value in ddict[imkey]:
tdict = {imkey:value}
write_to_csv(FILE_NAME,tdict)
else:
ctlist = list(ddict[imkey])
snp_dict = {imkey:'|'.join(ctlist)}
write_to_csv(SNAP_FILE_NAME, snp_dict)
elif im_length > 0:
if len(ddict[imkey]) == 1:
for value in ddict[imkey]:
fdict = {imkey:value}
write_to_csv(FRAUD_FILE_NAME,fdict)
else:
pass
# mult_ct = list(ddict[imkey])
# mydict = {imkey:','.join(mult_ct)}
# write_to_csv(MY_FILENAME,mydict)
else:
pass
这里是write_to_csv
:
def write_to_csv(filename,mdict):
file_exists = os.path.isfile(filename)
with open(filename,'a',newline='') as csvfile:
headers = ['IM No', 'CT ID']
writer = csv.DictWriter(
csvfile,
delimiter=',',
lineterminator='\n',
fieldnames=headers
)
if not file_exists:
writer.writeheader()
for key in mdict:
writer.writerow({'IM No': key, 'CT ID': mdict[key]})
我正在使用 s3 select.
读取包含 2 列的 csv 文件我正在生成 1 IM :1 CTID,一对多和多对多文件并将其上传回 s3 存储桶
处理从 s3 读取并上传回的 530 MB 文件大小需要 18 小时,我该如何进一步优化它?
这基本上是一个猜测,因为我不能 运行 你的代码。您将数据写入 CSV 文件的方式极其低效。
I/O 对 SSD 或磁盘的操作是 IT 中成本较高的操作之一。现在,您为要追加的每一行打开一个文件,然后追加它并再次关闭该文件。 这意味着对于一个 530 MB 的文件,您可能要执行数百万次这些昂贵的操作。
如果您查看任务管理器中的性能选项卡,您可能会看到磁盘使用率非常高。
在内存中缓冲其中的一些(如果 RAM 足够大,则缓冲全部)并在最后将它们刷新到磁盘会更有效。
大致是这样的:
FRAUD_FILE_CONTENTS = []
# Computation stuff
FRAU_FILE_CONTENTS.append({"my": "dict"})
# After the loop
with open(FRAUD_FILE_NAME, "w"):
# Write to CSV