python 代码仅从 gcs 存储桶中读取部分 csv 文件

python code only reading partial csv file from gcs bucket

我在从 Google 云存储桶中读取 csv 文件并将其写入同一存储桶中不同文件夹中的文件时遇到一个奇怪的问题。

我有一个名为 test.csv 的 csv 文件,其中包含 1000001 行。我试图用空白 space 替换每一行中的 " 并写入一个名为 cleansed_test.csv.

的文件

我在本地测试了我的代码并按预期工作。

下面是我在本地使用的代码

import pandas as pd
import csv
import re
new_lines=[]
new_lines_error_less_cols=[]
new_lines_error_more_cols=[]
with open('c:\Users\test_file.csv','r') as f:
    lines = f.readlines()
    print(len(lines))
    for line in lines:
        new_line = re.sub('["]','',line)
        new_line= new_line.strip()
        new_lines.append(new_line)
#         elif line.count('|') < 295:
#             new_line_error_less = re.sub('["]','inches',line)
#             new_line_error_less= new_line_error_less.strip()
#             new_lines_error_less_cols.append(new_line_error_less)
#         else:
#             new_line_error_more = re.sub('["]','inches',line)
#             new_line_error_more= new_line_error_more.strip()
#             new_lines_error_more_cols.append(new_line_error_more)
    new_data = pd.DataFrame(new_lines)
    print(new_data.info())
    #new_data.to_csv('c:\cleansed_file.csv',header=None,index=False,encoding='utf-8')

但是当我尝试在 gcs 存储桶中执行相同的文件时,只读取了 67514 行 我在 composer

中使用的代码
def replace_quotes(project,bucket,**context):
        import pandas as pd
        import numpy as np
        import csv
        import os
        import re
        import gcsfs
        import io
        fs = gcsfs.GCSFileSystem(project='project_name')
        updated_file_list = fs.ls('bucketname/FULL')
        updated_file_list = [ x for x in updated_file_list if "filename" in x ]
        new_lines=[]
        new_lines_error_less_cols=[]
        new_lines_error_more_cols=[]
        for f in updated_file_list:
            file_name = os.path.splitext(f)[0]
            parse_names = file_name.split('/')
            filename = parse_names[2]
            bucketname  = parse_names[0]
            with fs.open("gs://"+f,'r') as pf:
                lines = pf.readlines()
                print("length of lines----->",len(lines))#even here showing 67514
                for line in lines:
                    new_line = re.sub('["]','',line)
                    new_line= new_line.strip()
                    new_lines.append(new_line)
            new_data = pd.DataFrame(new_lines)
            #new_data.to_csv("gs://"+bucketname+"/ERROR_FILES/cleansed_"+filename+".csv",escapechar='',header = None,index=False,encoding='utf-8',quoting=csv.QUOTE_NONE)

我还在存储桶中看到文件 test.csv 和 cleansed_test.csv 的大小相同。

我唯一能想到的是,因为文件是在 gcs 存储桶中压缩的,所以我应该以不同的方式打开文件吗?因为当我将文件下载到本地时,它们比我在存储桶中看到的要大很多。

请指教

谢谢。

我想你可以通过使用dataframe列对象的替换方法,并指定bool true参数来实现你想要的(否则字段字符串必须完全匹配匹配字符的条件)。通过这种方式,您可以简单地对每一列进行迭代并替换不需要的字符串,然后用新修改的字符串重写每一列。

我稍微修改了您的代码并 运行 在我的 GCP 虚拟机上修改了它。如您所见,我更喜欢使用 Pandas.read_csv() 方法,因为 GCSF 向我抛出一些错误。代码似乎完成了它的工作,因为我最初通过替换一个虚拟的公共字符串进行了测试,并且运行顺利。

这是您修改后的代码。另请注意,我对阅读部分进行了一些重构,因为它没有正确匹配存储桶中 csv 的路径。

from pandas.api.types import is_string_dtype
import pandas as pd
import numpy as np
import csv
import os
import re
import gcsfs
import io
fs = gcsfs.GCSFileSystem(project='my-project')
updated_file_list = fs.ls('test-bucket/')
updated_file_list = [ x for x in updated_file_list if "simple.csv" in x ]
new_lines=[]
new_lines_error_less_cols=[]
new_lines_error_more_cols=[]


for f in updated_file_list:
        file_name = os.path.splitext(f)[0]
        print(f, file_name)
        parse_names = file_name.split('/')
        filename = parse_names[1]
        bucketname  = parse_names[0]
        with fs.open("gs://"+f) as pf:
            df = pd.read_csv(pf)
            #print(df.head(len(df)))  #To check results
            for col in df:
                if is_string_dtype(df[col]):
                    df[col] = df[col].replace(to_replace=['"'], value= '', regex= True)
            #print(df.head(len(df))) #To check results

        new_data = pd.DataFrame(df)
        #new_data.to_csv("gs://"+bucketname+"/ERROR_FILES/cleansed_"+filename+".csv",escapechar='',header = None,index=False,encoding='utf-8',quoting=csv.QUOTE_NONE

希望我的努力解决了你的问题...

对于任何好奇的人来说,这是如何扩充具有扩展名 .csv 但实际上是用 gzip 压缩的文件的。 gsutil 猫 gs://BUCKET/File_Name.csv |猫 | gsutil cp - gs://BUCKET/Newfile.csv

我在这里看到的唯一问题是 我不能使用通配符,或者我应该说直白一点,我们必须提供目标文件名

不利的一面是因为我必须指定目标文件名我不能在气流中的 bash 运算符中使用它(这就是我认为我可能是错误的)

谢谢

任何方式希望这有助于