Airflow s3Hook - 使用 pandas read_csv 读取 s3 中的文件

Airflow s3Hook - read files in s3 with pandas read_csv

我正在尝试使用 s3Hook 读取一些带有 pandas 的文件以获取密钥。我能够获得密钥,但是我不确定如何获得 pandas 来查找文件,当我 运行 以下时,我得到:

No such file or directory:

这是我的代码:

def transform_pages(company, **context):
    ds = context.get("execution_date").strftime('%Y-%m-%d')

    s3 = S3Hook('aws_default')
    s3_conn = s3.get_conn()
    keys = s3.list_keys(bucket_name=Variable.get('s3_bucket'),
                        prefix=f'S/{company}/pages/date={ds}/',
                        delimiter="/")

    prefix = f'S/{company}/pages/date={ds}/'
    logging.info(f'keys from function: {keys}')

    """ transforming pages and loading data back to S3 """
    for file in keys:
        df = pd.read_csv(file, sep='\t', skiprows=1, header=None)

您要查找的格式如下:

filepath = f"s3://{bucket_name}/{key}"

所以在你的具体情况下,类似于:

for file in keys:
    filepath = f"s3://s3_bucket/{file}"
    df = pd.read_csv(filepath, sep='\t', skiprows=1, header=None)

但请确保您已安装 s3fs (pip install s3fs)。