从 Airflow(使用 airflow Livy 运算符)将 Spark 作业提交给 Livy(在 EMR 中)
Submitting Spark Job to Livy (in EMR) from Airflow (using airflow Livy operator)
我正在尝试使用气流在 EMR 中安排工作 livy operator. Here is the example code 我遵循了。这里的问题是......没有指定 Livy 连接字符串(主机名和端口)。如何为运营商提供 Livy Server 主机名和端口?
此外,运算符有参数livy_conn_id
,在示例中设置了值livy_conn_default
。这是正确的值吗?...还是我设置了其他值?
您的 Airflow 仪表板的“管理”选项卡中的连接下应该有 livy_conn_default
,如果设置正确,那么是的,您可以使用它。否则,您可以更改它或创建另一个连接 ID 并在 livy_conn_id
中使用它
我们可以使用 2 APIs 来连接 Livy 和 Airflow:
- 使用 LivyBatchOperator
- 使用 LivyOperator
在下面的示例中,我将介绍 LivyOperator API.
LivyOperator
第一步:更新livy配置:
登录 airflow ui --> 单击 Admin 选项卡 --> 连接 --> 搜索 livy .单击编辑按钮并更新 Host 和 Port 参数。
第二步:安装apache-airflow-providers-apache-livy
pip install apache-airflow-providers-apache-livy
第三步:在$AIRFLOW_HOME/dags
目录下创建数据文件
vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'RangaReddy',
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id = "livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval='@once',
start_date = datetime(2022, 3, 2),
tags=['example', 'spark', 'livy']
)
# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
class_name="org.apache.spark.examples.SparkPi",
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")
begin_task >> livy_sparkpi_submit_task >> end_task
LIVY_HOST=192.168.0.1
curl http://${LIVY_HOST}:8998/batches/0/log | python3 -m json.tool
输出:
"Pi is roughly 3.14144103141441"
我正在尝试使用气流在 EMR 中安排工作 livy operator. Here is the example code 我遵循了。这里的问题是......没有指定 Livy 连接字符串(主机名和端口)。如何为运营商提供 Livy Server 主机名和端口?
此外,运算符有参数livy_conn_id
,在示例中设置了值livy_conn_default
。这是正确的值吗?...还是我设置了其他值?
您的 Airflow 仪表板的“管理”选项卡中的连接下应该有 livy_conn_default
,如果设置正确,那么是的,您可以使用它。否则,您可以更改它或创建另一个连接 ID 并在 livy_conn_id
我们可以使用 2 APIs 来连接 Livy 和 Airflow:
- 使用 LivyBatchOperator
- 使用 LivyOperator
在下面的示例中,我将介绍 LivyOperator API.
LivyOperator
第一步:更新livy配置:
登录 airflow ui --> 单击 Admin 选项卡 --> 连接 --> 搜索 livy .单击编辑按钮并更新 Host 和 Port 参数。
第二步:安装apache-airflow-providers-apache-livy
pip install apache-airflow-providers-apache-livy
第三步:在$AIRFLOW_HOME/dags
目录下创建数据文件
vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'RangaReddy',
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id = "livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval='@once',
start_date = datetime(2022, 3, 2),
tags=['example', 'spark', 'livy']
)
# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
class_name="org.apache.spark.examples.SparkPi",
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")
begin_task >> livy_sparkpi_submit_task >> end_task
LIVY_HOST=192.168.0.1
curl http://${LIVY_HOST}:8998/batches/0/log | python3 -m json.tool
输出:
"Pi is roughly 3.14144103141441"