Airflow s3Hook - 使用 pandas read_csv 读取 s3 中的文件
Airflow s3Hook - read files in s3 with pandas read_csv
我正在尝试使用 s3Hook
读取一些带有 pandas 的文件以获取密钥。我能够获得密钥,但是我不确定如何获得 pandas 来查找文件,当我 运行 以下时,我得到:
No such file or directory:
这是我的代码:
def transform_pages(company, **context):
ds = context.get("execution_date").strftime('%Y-%m-%d')
s3 = S3Hook('aws_default')
s3_conn = s3.get_conn()
keys = s3.list_keys(bucket_name=Variable.get('s3_bucket'),
prefix=f'S/{company}/pages/date={ds}/',
delimiter="/")
prefix = f'S/{company}/pages/date={ds}/'
logging.info(f'keys from function: {keys}')
""" transforming pages and loading data back to S3 """
for file in keys:
df = pd.read_csv(file, sep='\t', skiprows=1, header=None)
您要查找的格式如下:
filepath = f"s3://{bucket_name}/{key}"
所以在你的具体情况下,类似于:
for file in keys:
filepath = f"s3://s3_bucket/{file}"
df = pd.read_csv(filepath, sep='\t', skiprows=1, header=None)
但请确保您已安装 s3fs
(pip install s3fs
)。
我正在尝试使用 s3Hook
读取一些带有 pandas 的文件以获取密钥。我能够获得密钥,但是我不确定如何获得 pandas 来查找文件,当我 运行 以下时,我得到:
No such file or directory:
这是我的代码:
def transform_pages(company, **context):
ds = context.get("execution_date").strftime('%Y-%m-%d')
s3 = S3Hook('aws_default')
s3_conn = s3.get_conn()
keys = s3.list_keys(bucket_name=Variable.get('s3_bucket'),
prefix=f'S/{company}/pages/date={ds}/',
delimiter="/")
prefix = f'S/{company}/pages/date={ds}/'
logging.info(f'keys from function: {keys}')
""" transforming pages and loading data back to S3 """
for file in keys:
df = pd.read_csv(file, sep='\t', skiprows=1, header=None)
您要查找的格式如下:
filepath = f"s3://{bucket_name}/{key}"
所以在你的具体情况下,类似于:
for file in keys:
filepath = f"s3://s3_bucket/{file}"
df = pd.read_csv(filepath, sep='\t', skiprows=1, header=None)
但请确保您已安装 s3fs
(pip install s3fs
)。