到 运行 Spark 在 airflow (1**.1*.0.35) 中提交来自不同集群 (1**.1*.0.21) 的程序。如何远程连接气流中的其他集群
To run Spark Submit programs from a different cluster (1**.1*.0.21) in airflow (1**.1*.0.35). How to connect remotely other cluster in airflow
我一直在尝试在 Airflow 中使用 SparkSubmit 程序,但是 spark 文件在不同的集群 (1**.1*.0.21) 中,而 airflow 在 (1**.1*.0.35) 中。我正在寻找带有示例的该主题的详细说明。我无法将任何 xml 文件或其他文件复制或下载到我的气流集群。
当我尝试使用 SSH 挂钩时,它说。虽然我对使用 SSH Operator 和 BashOperator 有很多疑问。
Broken DAG: [/opt/airflow/dags/s.py] No module named paramiko
您可以尝试使用 Livy
在下面的 python 示例中,我的可执行 jar 在 S3 上。
import json, requests
def spark_submit(master_dns):
host = 'http://' + master_dns + ':8998'
data = {"conf": {"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"},
'file': "s3://<your driver jar>",
"jars": ["s3://<dependency>.jar"]
headers = {'Content-Type': 'application/json'}
print("Calling request........")
response = requests.post(host + '/batches', data=json.dumps(data), headers=headers)
print(response.json())
return response.headers
我是 运行 上面的代码包装成来自 Airflow
的 python 运算符
paramiko 是一个用于执行 ssh 操作的 python 库。您必须安装 paramiko 才能使用 SSH 操作员。
只需安装 paramiko,
命令:- pip3 install paramiko.
如果您在安装 paramiko 后有任何问题,请告诉我。
我连接上了,这是我的代码和程序。
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG(dag_id = "spk", description='filer',
schedule_interval='* * * * *',
start_date = airflow.utils.dates.days_ago(2),
params={'project_source': '/home/afzal',
'spark_submit': '/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client airpy.py'})
templated_bash_command = """
cd {{ params.project_source }}
{{ params.spark_submit }}
"""
t1 = SSHOperator(
task_id="SSH_task",
ssh_conn_id='spark_21',
command=templated_bash_command,
dag=dag
)
而且我还在 'Admin > Connections' in airflow
中创建了一个连接
Conn Id : spark_21
Conn Type : SSH
Host : mas****p
Username : afzal
Password : *****
Port :
Extra :
用户名和密码用于登录所需的集群。
我一直在尝试在 Airflow 中使用 SparkSubmit 程序,但是 spark 文件在不同的集群 (1**.1*.0.21) 中,而 airflow 在 (1**.1*.0.35) 中。我正在寻找带有示例的该主题的详细说明。我无法将任何 xml 文件或其他文件复制或下载到我的气流集群。
当我尝试使用 SSH 挂钩时,它说。虽然我对使用 SSH Operator 和 BashOperator 有很多疑问。
Broken DAG: [/opt/airflow/dags/s.py] No module named paramiko
您可以尝试使用 Livy 在下面的 python 示例中,我的可执行 jar 在 S3 上。
import json, requests
def spark_submit(master_dns):
host = 'http://' + master_dns + ':8998'
data = {"conf": {"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"},
'file': "s3://<your driver jar>",
"jars": ["s3://<dependency>.jar"]
headers = {'Content-Type': 'application/json'}
print("Calling request........")
response = requests.post(host + '/batches', data=json.dumps(data), headers=headers)
print(response.json())
return response.headers
我是 运行 上面的代码包装成来自 Airflow
的 python 运算符paramiko 是一个用于执行 ssh 操作的 python 库。您必须安装 paramiko 才能使用 SSH 操作员。 只需安装 paramiko, 命令:- pip3 install paramiko.
如果您在安装 paramiko 后有任何问题,请告诉我。
我连接上了,这是我的代码和程序。
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG(dag_id = "spk", description='filer',
schedule_interval='* * * * *',
start_date = airflow.utils.dates.days_ago(2),
params={'project_source': '/home/afzal',
'spark_submit': '/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client airpy.py'})
templated_bash_command = """
cd {{ params.project_source }}
{{ params.spark_submit }}
"""
t1 = SSHOperator(
task_id="SSH_task",
ssh_conn_id='spark_21',
command=templated_bash_command,
dag=dag
)
而且我还在 'Admin > Connections' in airflow
中创建了一个连接Conn Id : spark_21
Conn Type : SSH
Host : mas****p
Username : afzal
Password : *****
Port :
Extra :
用户名和密码用于登录所需的集群。