从路易吉切换到气流

switching from luigi to airflow

我有一个相对简单的任务,从 1.2 mio 文件的 运行 开始,每个文件都有一个管道(中间产品保存有多个步骤)。我已经在 luigi 中实现了这个:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜欢 Luigi 使用文件系统来查看任务是否已完成。 我还找到了一个实现,我可以在其中删除中间产品,管道将重新创建所有相关产品(因此我可以更改管道)。 我如何在气流中做到这一点(或者我应该坚持使用 Luigi?)?

我真的不知道路易吉是怎么工作的。我主要使用 Apache Airflow。 Airflow 是一个工作流管理系统。这意味着它不传输数据、转换数据或生成一些数据(尽管它生成日志并且有一个名为 Xcom 的概念允许在任务之间交换消息,允许更细微的控制形式和共享状态。),例如.阿帕奇尼菲。但它定义了您使用 Operators 实例化它的每个任务的依赖关系,例如。 BashOperator。为了知道任务是否完成,它会检查同一任务返回的信号。

以下是您希望在 Airflow 中实现的示例。

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_dag', default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))


def extract_gzs():
    for filename in glob.glob('/1002/*.gz')
        with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)


extractGZ = PythonOperator(
    task_id='extract_gz',
    provide_context=True,
    python_callable=extract_gzs(),
dag=dag)


cmd_cmd="""
your sed script!
"""

sed_script = BashOperator(
    task_id='sed_script', 
    bash_command=cmd_cmd, 
    dag=dag)


extractGZ.set_downstream(sed_script)
  1. 导入您想在 Airflow 中使用的运算符(当然如果您需要其他运算符 classes/libraries)
  2. 定义你的 Dag。在变量 args 中,我定义了 ownerstart_date 参数。
  3. 然后实例化你的DAG。这里我把它命名为example_dag,给它的定义变量赋值,schedule_interval以及超时时间(根据你的需要还有很多参数可以使用)
  4. 创建了一个 python 函数 extract_gzs()
  5. 实例化了一个 PythonOperator 我调用 python func
  6. 对 bash 代码做同样的事情
  7. 确定两个任务实例之间的依赖关系

当然,实现相同想法的方法还有很多。根据需要进行调整! PS: Here 有一些 Apache Airflow 的例子