在对 Google Cloud Bucket 执行一些 ETL 后使用 to_csv
Using to_csv after preforming some ETL to a Google Cloud Bucket
我想知道是否有人可以提供帮助。我正在尝试从 GCP 存储桶中获取 CSV,运行 将其放入数据框,然后将文件输出到项目中的另一个存储桶,但是使用这种方法我的 dag 是 运行ning 但我难道我没有将任何输出放入我指定的存储桶中吗?我的狗需要很长时间才能 运行。对这个问题有什么见解吗?
import gcsfs
from airflow.operators import python_operator
from airflow import models
import pandas as pd
import logging
import csv
import datetime
fs = gcsfs.GCSFileSystem(project='project-goes-here')
with fs.open('gs://path/file.csv') as f:
gas_data = pd.read_csv(f)
def make_csv():
# Creates the CSV file with a datetime with no index, and adds the map, collection and collection address to the CSV
# Calisto changed their mind on the position of where the conversion factor and multiplication factor should go
gas_data['Asset collection'] = 'Distribution'
gas_data['Asset collection address 1'] = 'Distribution'
gas_data['Asset collection address 2'] = 'Units1+2 Central City'
gas_data['Asset collection address 3'] = 'ind Est'
gas_data['Asset collection city'] = 'Coventry'
gas_data['Asset collection postcode'] = 'CV6 5RY'
gas_data['Multiplication Factor'] = '1.000'
gas_data['Conversion Factor'] = '1.022640'
gas_data.to_csv('gs://path/'
'Clean_zenos_data_' + datetime.datetime.today().strftime('%m%d%Y%H%M%S''.csv'), index=False,
quotechar='"', sep=',', quoting=csv.QUOTE_NONNUMERIC)
logging.info('Added Map, Asset collection, Asset collection address and Saved CSV')
make_csv_function = python_operator.PythonOperator(
task_id='make_csv',
python_callable=make_csv
)
不确定我是否理解正确,但你似乎将你的 PythonOperator
创建嵌套在 make_csv
依赖项中,据我所知这是一个无限循环。也许尝试在函数之外删除它,看看会发生什么?
还有一个问题是您在任何 task/python 可调用函数之外读取 csv 文件。 Airflow 会在每次心跳时读取该文件(我相信是 1 分钟),这是不好的。
也许您可以将读取的 csv 移动到 make_csv()
函数内部,而且我还可以在您的代码中看到一些缩进错误。
我想知道是否有人可以提供帮助。我正在尝试从 GCP 存储桶中获取 CSV,运行 将其放入数据框,然后将文件输出到项目中的另一个存储桶,但是使用这种方法我的 dag 是 运行ning 但我难道我没有将任何输出放入我指定的存储桶中吗?我的狗需要很长时间才能 运行。对这个问题有什么见解吗?
import gcsfs
from airflow.operators import python_operator
from airflow import models
import pandas as pd
import logging
import csv
import datetime
fs = gcsfs.GCSFileSystem(project='project-goes-here')
with fs.open('gs://path/file.csv') as f:
gas_data = pd.read_csv(f)
def make_csv():
# Creates the CSV file with a datetime with no index, and adds the map, collection and collection address to the CSV
# Calisto changed their mind on the position of where the conversion factor and multiplication factor should go
gas_data['Asset collection'] = 'Distribution'
gas_data['Asset collection address 1'] = 'Distribution'
gas_data['Asset collection address 2'] = 'Units1+2 Central City'
gas_data['Asset collection address 3'] = 'ind Est'
gas_data['Asset collection city'] = 'Coventry'
gas_data['Asset collection postcode'] = 'CV6 5RY'
gas_data['Multiplication Factor'] = '1.000'
gas_data['Conversion Factor'] = '1.022640'
gas_data.to_csv('gs://path/'
'Clean_zenos_data_' + datetime.datetime.today().strftime('%m%d%Y%H%M%S''.csv'), index=False,
quotechar='"', sep=',', quoting=csv.QUOTE_NONNUMERIC)
logging.info('Added Map, Asset collection, Asset collection address and Saved CSV')
make_csv_function = python_operator.PythonOperator(
task_id='make_csv',
python_callable=make_csv
)
不确定我是否理解正确,但你似乎将你的 PythonOperator
创建嵌套在 make_csv
依赖项中,据我所知这是一个无限循环。也许尝试在函数之外删除它,看看会发生什么?
还有一个问题是您在任何 task/python 可调用函数之外读取 csv 文件。 Airflow 会在每次心跳时读取该文件(我相信是 1 分钟),这是不好的。
也许您可以将读取的 csv 移动到 make_csv()
函数内部,而且我还可以在您的代码中看到一些缩进错误。