如何 运行 Airflow 任务同步
How to run Airflow tasks synchronously
我的气流包括 2-3 个步骤
- PythonOperator --> 它运行在 AWS Athena 上查询并将生成的文件存储在特定的 s3 路径上
- BashOperator --> 增加用于跟踪的气流变量
- BashOperator --> 它获取 task1 的输出(响应)和 运行 上面的一些代码。
这里发生的是气流在几秒钟内完成,即使 Athena 查询步骤是 运行ning。
我想确保在文件生成后,进一步的步骤应该 运行。基本上我希望这是同步的。
您可以将任务设置为:
def athena_task():
# Add your code
return
t1 = PythonOperator(
task_id='athena_task',
python_callable=athena_task,
)
t2 = BashOperator(
task_id='variable_task',
bash_command='', #replace with relevant command
)
t3 = BashOperator(
task_id='process_task',
bash_command='', #replace with relevant command
)
t1 >> t2 >> t3
t2只有在t1成功完成后才会运行,t3只有在t2成功完成后才会开始。
请注意 Airflow 有 AWSAthenaOperator 这可能会省去您自己编写代码的麻烦。操作员向 Athena 提交查询并通过设置 output_location
参数将输出保存在 S3 路径中:
run_query = AWSAthenaOperator(
task_id='athena_task',
query='SELECT * FROM my_table',
output_location='s3://some-bucket/some-path/',
database='my_database'
)
Athena 的查询 API 是异步的。您开始查询,取回一个 ID,然后您需要使用 GetQueryExecution
API 调用进行轮询,直到查询完成。
如果您仅在第一个任务中启动查询,则无法保证在下一个任务运行时查询已完成。只有当 GetQueryExecution
返回状态 SUCCEEDED
(或 FAILED
/CANCELLED
)时,您才能期望输出文件存在。
正如@Elad 指出的那样,AWSAthenaOperator
会为您做这些,并处理错误情况,等等。
我的气流包括 2-3 个步骤
- PythonOperator --> 它运行在 AWS Athena 上查询并将生成的文件存储在特定的 s3 路径上
- BashOperator --> 增加用于跟踪的气流变量
- BashOperator --> 它获取 task1 的输出(响应)和 运行 上面的一些代码。
这里发生的是气流在几秒钟内完成,即使 Athena 查询步骤是 运行ning。
我想确保在文件生成后,进一步的步骤应该 运行。基本上我希望这是同步的。
您可以将任务设置为:
def athena_task():
# Add your code
return
t1 = PythonOperator(
task_id='athena_task',
python_callable=athena_task,
)
t2 = BashOperator(
task_id='variable_task',
bash_command='', #replace with relevant command
)
t3 = BashOperator(
task_id='process_task',
bash_command='', #replace with relevant command
)
t1 >> t2 >> t3
t2只有在t1成功完成后才会运行,t3只有在t2成功完成后才会开始。
请注意 Airflow 有 AWSAthenaOperator 这可能会省去您自己编写代码的麻烦。操作员向 Athena 提交查询并通过设置 output_location
参数将输出保存在 S3 路径中:
run_query = AWSAthenaOperator(
task_id='athena_task',
query='SELECT * FROM my_table',
output_location='s3://some-bucket/some-path/',
database='my_database'
)
Athena 的查询 API 是异步的。您开始查询,取回一个 ID,然后您需要使用 GetQueryExecution
API 调用进行轮询,直到查询完成。
如果您仅在第一个任务中启动查询,则无法保证在下一个任务运行时查询已完成。只有当 GetQueryExecution
返回状态 SUCCEEDED
(或 FAILED
/CANCELLED
)时,您才能期望输出文件存在。
正如@Elad 指出的那样,AWSAthenaOperator
会为您做这些,并处理错误情况,等等。