在对 Google Cloud Bucket 执行一些 ETL 后使用 to_csv

Using to_csv after preforming some ETL to a Google Cloud Bucket

我想知道是否有人可以提供帮助。我正在尝试从 GCP 存储桶中获取 CSV,运行 将其放入数据框,然后将文件输出到项目中的另一个存储桶,但是使用这种方法我的 dag 是 运行ning 但我难道我没有将任何输出放入我指定的存储桶中吗?我的狗需要很长时间才能 运行。对这个问题有什么见解吗?

import gcsfs
from airflow.operators import python_operator
from airflow import models
import pandas as pd
import logging
import csv
import datetime


fs = gcsfs.GCSFileSystem(project='project-goes-here')
with fs.open('gs://path/file.csv') as f:
    gas_data = pd.read_csv(f)


def make_csv():
    # Creates the CSV file with a datetime with no index, and adds the map, collection and collection address to the CSV
    # Calisto changed their mind on the position of where the conversion factor and multiplication factor should go
    gas_data['Asset collection'] = 'Distribution'
    gas_data['Asset collection address 1'] = 'Distribution'
    gas_data['Asset collection address 2'] = 'Units1+2 Central City'
    gas_data['Asset collection address 3'] = 'ind Est'
    gas_data['Asset collection city'] = 'Coventry'
    gas_data['Asset collection postcode'] = 'CV6 5RY'
    gas_data['Multiplication Factor'] = '1.000'
    gas_data['Conversion Factor'] = '1.022640'
    gas_data.to_csv('gs://path/'
                'Clean_zenos_data_' + datetime.datetime.today().strftime('%m%d%Y%H%M%S''.csv'), index=False,
                quotechar='"', sep=',', quoting=csv.QUOTE_NONNUMERIC)
                logging.info('Added Map, Asset collection, Asset collection address and Saved CSV')

    make_csv_function = python_operator.PythonOperator(
    task_id='make_csv',
    python_callable=make_csv
)

不确定我是否理解正确,但你似乎将你的 PythonOperator 创建嵌套在 make_csv 依赖项中,据我所知这是一个无限循环。也许尝试在函数之外删除它,看看会发生什么?

还有一个问题是您在任何 task/python 可调用函数之外读取 csv 文件。 Airflow 会在每次心跳时读取该文件(我相信是 1 分钟),这是不好的。 也许您可以将读取的 csv 移动到 make_csv() 函数内部,而且我还可以在您的代码中看到一些缩进错误。