如何使用 Airflow 重启失败的结构化流式 Spark 作业?

How to use Airflow to restart a failed structured streaming spark job?

我需要 运行 AWS EMR 中的结构化流式 Spark 作业。作为弹性需求,如果spark job由于某些原因失败了,我们希望可以在EMR中重新创建spark job。类似于ECS中的任务编排,可以在健康检查失败的情况下重启任务。然而,EMR 更像是一个计算引擎,而不是编排系统。

我正在寻找一些大数据工作流编排工具,比如Airflow。但是,它不能支持 DAG 中的循环。我怎样才能实现下面的一些功能?

step_adder (EmrAddStepsOperator) >> step_checker (EmrStepSensor) >> step_adder (EmrAddStepsOperator).

提高此类工作级别弹性的建议方法是什么?欢迎任何评论!

Apache Spark 已经涵盖了一些弹性(使用 spark-submit 提交的作业),但是当您想要与不使用 Spark 的不同进程交互时,Airflow 可能是一个解决方案。在您的情况下, Sensor 可以帮助检测是否发生了某种情况。基于此,您可以在 DAG 中做出决定。这是一个简单的 HttpSensor 等待批处理作业以查看它是否成功完成

wait_batch_to_finish = HttpSensor(
    http_conn_id='spark_web',
    task_id="wait_batch_to_finish",
    method="GET",
    headers={"Content-Type": "application/json"},
    endpoint="/json",
    response_check=lambda response: check_spark_status(response, "{{ ti.xcom_pull('batch_intel_task')}}"),
    poke_interval=60,
    dag=dag
)