如何 运行 Airflow 中的 Spark 代码?
How to run Spark code in Airflow?
地球人大家好!
我正在使用 Airflow 来安排 运行 Spark 任务。
这次我发现的是 Airflow 可以管理的 python 个 DAG。
DAG 示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
问题是我不擅长 Python 代码并且有一些任务是用 Java 编写的。我的问题是如何在 python DAG 中 运行 Spark Java jar?或者也许还有其他方法哟?我发现 spark 提交:http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道如何将所有内容联系在一起。也许有人以前用过它并且有工作示例。感谢您的宝贵时间!
您应该可以使用 BashOperator
。保持其余代码不变,导入所需的 class 和系统包:
from airflow.operators.bash_operator import BashOperator
import os
import sys
设置所需路径:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
并添加运算符:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
您可以轻松地扩展它以使用 Jinja 模板提供额外的参数。
您当然可以针对非 Spark 场景进行调整,将 bash_command
替换为适合您情况的模板,例如:
bash_command = 'java -jar {{ params.jar }}'
并调整 params
.
从 1.8 版(今天发布)开始的 Airflow 具有
- SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;
SparkSQLHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
请注意,这两个新的 Spark operators/hooks 在 1.8 版本的 "contrib" 分支中,因此没有(很好)记录。
因此您可以使用 SparkSubmitOperator 提交您的 java 代码以执行 Spark。
有一个 SparkSubmitOperator
在 kubernetes(minikube 实例)上使用 Spark 2.3.1 的示例:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'user@mail.com',
'depends_on_past': False,
'start_date': datetime(2018, 7, 27),
'email': ['user@mail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2018, 7, 29),
}
dag = DAG(
'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.ibm.cdopoc.DataLoaderDB2COS',
application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='airflowspark-DataLoaderDB2COS',
verbose=True,
driver_memory='1g',
conf={
'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
'spark.COS_BUCKET': 'data-ingestion-poc',
'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
},
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
使用 Airflow 变量中存储的变量的代码:
此外,您需要创建一个新的火花连接或编辑现有的 'spark_default'
额外的字典 {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
:
转到 Airflow Admin
-> Connection
-> Create
UI。通过提供主机 = IP 地址、端口 = 22 和额外的 {"key_file": "/path/to/pem/file", "no_host_key_check":true}
创建一个新的 SSH 连接
此主机应该是您可以从中提交的 Spark 集群主机 spark-jobs。接下来,您需要使用 SSHOperator 创建一个 DAG。以下是此模板。
with DAG(dag_id='ssh-dag-id',
schedule_interval="05 12 * * *",
catchup=False) as dag:
spark_job = ("spark-submit --class fully.qualified.class.name "
"--master yarn "
"--deploy-mode client "
"--driver-memory 6G "
"--executor-memory 6G "
"--num-executors 6 "
"/path/to/your-spark.jar")
ssh_run_query = SSHOperator(
task_id="random_task_id",
ssh_conn_id="name_of_connection_you just_created",
command=spark_job,
get_pty=True,
dag=dag)
ssh_run_query
就是这样。您还可以在 Airflow 中获得此 Spark 作业的完整日志。
地球人大家好!
我正在使用 Airflow 来安排 运行 Spark 任务。
这次我发现的是 Airflow 可以管理的 python 个 DAG。
DAG 示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
问题是我不擅长 Python 代码并且有一些任务是用 Java 编写的。我的问题是如何在 python DAG 中 运行 Spark Java jar?或者也许还有其他方法哟?我发现 spark 提交:http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道如何将所有内容联系在一起。也许有人以前用过它并且有工作示例。感谢您的宝贵时间!
您应该可以使用 BashOperator
。保持其余代码不变,导入所需的 class 和系统包:
from airflow.operators.bash_operator import BashOperator
import os
import sys
设置所需路径:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
并添加运算符:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
您可以轻松地扩展它以使用 Jinja 模板提供额外的参数。
您当然可以针对非 Spark 场景进行调整,将 bash_command
替换为适合您情况的模板,例如:
bash_command = 'java -jar {{ params.jar }}'
并调整 params
.
从 1.8 版(今天发布)开始的 Airflow 具有
- SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;
SparkSQLHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
请注意,这两个新的 Spark operators/hooks 在 1.8 版本的 "contrib" 分支中,因此没有(很好)记录。
因此您可以使用 SparkSubmitOperator 提交您的 java 代码以执行 Spark。
有一个 SparkSubmitOperator
在 kubernetes(minikube 实例)上使用 Spark 2.3.1 的示例:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'user@mail.com',
'depends_on_past': False,
'start_date': datetime(2018, 7, 27),
'email': ['user@mail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2018, 7, 29),
}
dag = DAG(
'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.ibm.cdopoc.DataLoaderDB2COS',
application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='airflowspark-DataLoaderDB2COS',
verbose=True,
driver_memory='1g',
conf={
'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
'spark.COS_BUCKET': 'data-ingestion-poc',
'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
},
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
使用 Airflow 变量中存储的变量的代码:
此外,您需要创建一个新的火花连接或编辑现有的 'spark_default'
额外的字典 {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
:
转到 Airflow Admin
-> Connection
-> Create
UI。通过提供主机 = IP 地址、端口 = 22 和额外的 {"key_file": "/path/to/pem/file", "no_host_key_check":true}
此主机应该是您可以从中提交的 Spark 集群主机 spark-jobs。接下来,您需要使用 SSHOperator 创建一个 DAG。以下是此模板。
with DAG(dag_id='ssh-dag-id',
schedule_interval="05 12 * * *",
catchup=False) as dag:
spark_job = ("spark-submit --class fully.qualified.class.name "
"--master yarn "
"--deploy-mode client "
"--driver-memory 6G "
"--executor-memory 6G "
"--num-executors 6 "
"/path/to/your-spark.jar")
ssh_run_query = SSHOperator(
task_id="random_task_id",
ssh_conn_id="name_of_connection_you just_created",
command=spark_job,
get_pty=True,
dag=dag)
ssh_run_query
就是这样。您还可以在 Airflow 中获得此 Spark 作业的完整日志。