从路易吉切换到气流
switching from luigi to airflow
我有一个相对简单的任务,从 1.2 mio 文件的 运行 开始,每个文件都有一个管道(中间产品保存有多个步骤)。我已经在 luigi 中实现了这个:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜欢 Luigi 使用文件系统来查看任务是否已完成。
我还找到了一个实现,我可以在其中删除中间产品,管道将重新创建所有相关产品(因此我可以更改管道)。
我如何在气流中做到这一点(或者我应该坚持使用 Luigi?)?
我真的不知道路易吉是怎么工作的。我主要使用 Apache Airflow。 Airflow 是一个工作流管理系统。这意味着它不传输数据、转换数据或生成一些数据(尽管它生成日志并且有一个名为 Xcom
的概念允许在任务之间交换消息,允许更细微的控制形式和共享状态。),例如.阿帕奇尼菲。但它定义了您使用 Operators
实例化它的每个任务的依赖关系,例如。 BashOperator
。为了知道任务是否完成,它会检查同一任务返回的信号。
以下是您希望在 Airflow 中实现的示例。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
def extract_gzs():
for filename in glob.glob('/1002/*.gz')
with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
extractGZ = PythonOperator(
task_id='extract_gz',
provide_context=True,
python_callable=extract_gzs(),
dag=dag)
cmd_cmd="""
your sed script!
"""
sed_script = BashOperator(
task_id='sed_script',
bash_command=cmd_cmd,
dag=dag)
extractGZ.set_downstream(sed_script)
- 导入您想在 Airflow 中使用的运算符(当然如果您需要其他运算符 classes/libraries)
- 定义你的 Dag。在变量
args
中,我定义了 owner
和 start_date
参数。
- 然后实例化你的DAG。这里我把它命名为example_dag,给它的定义变量赋值,schedule_interval以及超时时间(根据你的需要还有很多参数可以使用)
- 创建了一个 python 函数 extract_gzs()
- 实例化了一个
PythonOperator
我调用 python func
- 对 bash 代码做同样的事情
- 确定两个任务实例之间的依赖关系
当然,实现相同想法的方法还有很多。根据需要进行调整!
PS: Here 有一些 Apache Airflow 的例子
我有一个相对简单的任务,从 1.2 mio 文件的 运行 开始,每个文件都有一个管道(中间产品保存有多个步骤)。我已经在 luigi 中实现了这个:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜欢 Luigi 使用文件系统来查看任务是否已完成。 我还找到了一个实现,我可以在其中删除中间产品,管道将重新创建所有相关产品(因此我可以更改管道)。 我如何在气流中做到这一点(或者我应该坚持使用 Luigi?)?
我真的不知道路易吉是怎么工作的。我主要使用 Apache Airflow。 Airflow 是一个工作流管理系统。这意味着它不传输数据、转换数据或生成一些数据(尽管它生成日志并且有一个名为 Xcom
的概念允许在任务之间交换消息,允许更细微的控制形式和共享状态。),例如.阿帕奇尼菲。但它定义了您使用 Operators
实例化它的每个任务的依赖关系,例如。 BashOperator
。为了知道任务是否完成,它会检查同一任务返回的信号。
以下是您希望在 Airflow 中实现的示例。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
def extract_gzs():
for filename in glob.glob('/1002/*.gz')
with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
extractGZ = PythonOperator(
task_id='extract_gz',
provide_context=True,
python_callable=extract_gzs(),
dag=dag)
cmd_cmd="""
your sed script!
"""
sed_script = BashOperator(
task_id='sed_script',
bash_command=cmd_cmd,
dag=dag)
extractGZ.set_downstream(sed_script)
- 导入您想在 Airflow 中使用的运算符(当然如果您需要其他运算符 classes/libraries)
- 定义你的 Dag。在变量
args
中,我定义了owner
和start_date
参数。 - 然后实例化你的DAG。这里我把它命名为example_dag,给它的定义变量赋值,schedule_interval以及超时时间(根据你的需要还有很多参数可以使用)
- 创建了一个 python 函数 extract_gzs()
- 实例化了一个
PythonOperator
我调用 python func - 对 bash 代码做同样的事情
- 确定两个任务实例之间的依赖关系
当然,实现相同想法的方法还有很多。根据需要进行调整! PS: Here 有一些 Apache Airflow 的例子