如何 运行 Airflow 任务同步

How to run Airflow tasks synchronously

我的气流包括 2-3 个步骤

  1. PythonOperator --> 它运行在 AWS Athena 上查询并将生成的文件存储在特定的 s3 路径上
  2. BashOperator --> 增加用于跟踪的气流变量
  3. BashOperator --> 它获取 task1 的输出(响应)和 运行 上面的一些代码。

这里发生的是气流在几秒钟内完成,即使 Athena 查询步骤是 运行ning。

我想确保在文件生成后,进一步的步骤应该 运行。基本上我希望这是同步的。

您可以将任务设置为:

def athena_task():
    # Add your code
    return

t1 = PythonOperator(
    task_id='athena_task',
    python_callable=athena_task,
)

t2 = BashOperator(
    task_id='variable_task',
    bash_command='', #replace with relevant command
)

t3 = BashOperator(
    task_id='process_task',
    bash_command='', #replace with relevant command
)

t1 >> t2 >> t3

t2只有在t1成功完成后才会运行,t3只有在t2成功完成后才会开始。

请注意 Airflow 有 AWSAthenaOperator 这可能会省去您自己编写代码的麻烦。操作员向 Athena 提交查询并通过设置 output_location 参数将输出保存在 S3 路径中:

run_query = AWSAthenaOperator(
    task_id='athena_task',
    query='SELECT * FROM  my_table',
    output_location='s3://some-bucket/some-path/',
    database='my_database'
)

Athena 的查询 API 是异步的。您开始查询,取回一个 ID,然后您需要使用 GetQueryExecution API 调用进行轮询,直到查询完成。

如果您仅在第一个任务中启动查询,则无法保证在下一个任务运行时查询已完成。只有当 GetQueryExecution 返回状态 SUCCEEDED(或 FAILED/CANCELLED)时,您才能期望输出文件存在。

正如@Elad 指出的那样,AWSAthenaOperator 会为您做这些,并处理错误情况,等等。