到 运行 Spark 在 airflow (1**.1*.0.35) 中提交来自不同集群 (1**.1*.0.21) 的程序。如何远程连接气流中的其他集群

To run Spark Submit programs from a different cluster (1**.1*.0.21) in airflow (1**.1*.0.35). How to connect remotely other cluster in airflow

我一直在尝试在 Airflow 中使用 SparkSubmit 程序,但是 spark 文件在不同的集群 (1**.1*.0.21) 中,而 airflow 在 (1**.1*.0.35) 中。我正在寻找带有示例的该主题的详细说明。我无法将任何 xml 文件或其他文件复制或下载到我的气流集群。

当我尝试使用 SSH 挂钩时,它说。虽然我对使用 SSH Operator 和 BashOperator 有很多疑问。

Broken DAG: [/opt/airflow/dags/s.py] No module named paramiko

您可以尝试使用 Livy 在下面的 python 示例中,我的可执行 jar 在 S3 上。

import json, requests
def spark_submit(master_dns):
        host = 'http://' + master_dns + ':8998'
        data = {"conf": {"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"},
                'file': "s3://<your driver jar>",
                "jars": ["s3://<dependency>.jar"]
        headers = {'Content-Type': 'application/json'}
        print("Calling request........")
        response = requests.post(host + '/batches', data=json.dumps(data), headers=headers)
        print(response.json())
        return response.headers

我是 运行 上面的代码包装成来自 Airflow

的 python 运算符

paramiko 是一个用于执行 ssh 操作的 python 库。您必须安装 paramiko 才能使用 SSH 操作员。 只需安装 paramiko, 命令:- pip3 install paramiko.

如果您在安装 paramiko 后有任何问题,请告诉我。

我连接上了,这是我的代码和程序。

import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


dag = DAG(dag_id = "spk", description='filer',
          schedule_interval='* * * * *',
          start_date = airflow.utils.dates.days_ago(2),
          params={'project_source': '/home/afzal',
                  'spark_submit': '/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client airpy.py'})

templated_bash_command = """
            cd {{ params.project_source }}
            {{ params.spark_submit }} 
            """

t1 = SSHOperator(
       task_id="SSH_task",
       ssh_conn_id='spark_21',
       command=templated_bash_command,
       dag=dag
       )

而且我还在 'Admin > Connections' in airflow

中创建了一个连接
Conn Id : spark_21
Conn Type : SSH
Host : mas****p
Username : afzal
Password : ***** 
Port  :
Extra  :

用户名和密码用于登录所需的集群。