如何使用气流编排简单的 pandas etl python 脚本?
How to use airflow for orchestrating simple pandas etl python scripts?
我喜欢气流的想法,但我受困于基础知识。从昨天开始,我在 vm ubuntu-postgres 解决方案上使用了 airflow 运行。我可以看到仪表板和示例数据 :))我现在想要的是迁移一个示例脚本,我用它来处理原始数据到准备好的数据。
假设您有一个 csv 文件文件夹。今天我的脚本遍历它,将每个文件传递给一个将被转换成 df 的列表。之后我准备了他们的列名并进行了一些数据清理并将其写入不同的格式。
1: pd.read_csv 用于目录
中的文件
2: 创建一个 df
3:清理列名
4:干净的值(与 stp 3 平行)
5: 将结果写入数据库
我将如何根据气流组织我的文件?脚本应该是什么样子的?我是传递单一方法、单一文件还是必须为每个部分创建多个文件?在这一点上我缺乏基本概念 :( 我读到的关于气流的所有内容都比我的简单案例复杂得多。我正在考虑远离气流以及 Bonobo、Mara、Luigi,但我认为气流是值得的?!
我会使用 PythonOperator
,将整个代码放入 Python 函数中,创建一个 Airflow 任务,仅此而已。
如果需要拆分这些步骤,也可以将 csv 文件的加载和数据库写入放在一个函数中。所有这些都将放在一个 DAG 中。
因此您的一个 DAG 将具有三个任务,例如:
loadCSV (PythonOperator)
parseDF (PythonOperator)
pushToDB (PythonOperator)
如果您使用多项任务,您需要使用Airflow's XCom。一开始只使用一个任务会更容易。
此处标签气流下有几个代码示例。当你创造了一些东西,再问一次。
对于仍然坚持这个问题的人,我们最近为气流实施了自定义 XCom 后端,由 vineyard 支持,以支持此类情况。
提供商在那里是开源的:https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
借助 Vineyard XCom 后端,用户可以拥有直接生产和消费 pandas.DataFrame
的 dag,而无需任何“to_csv” + “from_csv” hack,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
希望对您有所帮助。
我喜欢气流的想法,但我受困于基础知识。从昨天开始,我在 vm ubuntu-postgres 解决方案上使用了 airflow 运行。我可以看到仪表板和示例数据 :))我现在想要的是迁移一个示例脚本,我用它来处理原始数据到准备好的数据。
假设您有一个 csv 文件文件夹。今天我的脚本遍历它,将每个文件传递给一个将被转换成 df 的列表。之后我准备了他们的列名并进行了一些数据清理并将其写入不同的格式。
1: pd.read_csv 用于目录
中的文件2: 创建一个 df
3:清理列名
4:干净的值(与 stp 3 平行)
5: 将结果写入数据库
我将如何根据气流组织我的文件?脚本应该是什么样子的?我是传递单一方法、单一文件还是必须为每个部分创建多个文件?在这一点上我缺乏基本概念 :( 我读到的关于气流的所有内容都比我的简单案例复杂得多。我正在考虑远离气流以及 Bonobo、Mara、Luigi,但我认为气流是值得的?!
我会使用 PythonOperator
,将整个代码放入 Python 函数中,创建一个 Airflow 任务,仅此而已。
如果需要拆分这些步骤,也可以将 csv 文件的加载和数据库写入放在一个函数中。所有这些都将放在一个 DAG 中。
因此您的一个 DAG 将具有三个任务,例如:
loadCSV (PythonOperator)
parseDF (PythonOperator)
pushToDB (PythonOperator)
如果您使用多项任务,您需要使用Airflow's XCom。一开始只使用一个任务会更容易。
此处标签气流下有几个代码示例。当你创造了一些东西,再问一次。
对于仍然坚持这个问题的人,我们最近为气流实施了自定义 XCom 后端,由 vineyard 支持,以支持此类情况。
提供商在那里是开源的:https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
借助 Vineyard XCom 后端,用户可以拥有直接生产和消费 pandas.DataFrame
的 dag,而无需任何“to_csv” + “from_csv” hack,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
希望对您有所帮助。