Airflow SSH 操作员如何指定身份验证类型

Airflow SSH operator how to specify authentication type

我正在尝试使用 SSHOperator 从 Airflow worker ssh 连接到我的一台服务器。我的 SSH 被配置为使用 Kerberos 身份验证类型。使用以下额外参数配置的默认 SSH 连接出现以下错误。

SSH operator error: Bad authentication type; allowed types: ['publickey,'gssapi-keyex',gssapi-with-mic,'keyboard-interactive']

我该如何解决这个错误。我在 airflow UI.

连接的 extras 字段中尝试了以下设置
{ "gss-auth":"true","gssapi-keyex":"true","gss-kex":"true"}

Airflow SSHoperator 中是否有一个选项可以指定要使用的身份验证类型是 Kerberos?

Airflow 的现有 SSHOperator 中不支持 Kerberos 身份验证,即使底层 Paramiko 库具有该支持也是如此。

我能够通过编写扩展 SSHHook 的自定义挂钩来解决此问题,该挂钩将参数传递给底层 Paramiko 库以将 Kerberos 指定为身份验证类型。有效!感谢 Airflow 的易扩展性。

自定义挂钩:

class CustomSSHHook(SSHHook):
    """
    Custom SSH Hook with kerberose authentication support
    """

    def __init__(self,
                 ssh_conn_id=None,
                 remote_host=None,
                 username=None,
                 password=None,
                 key_file=None,
                 port=None,
                 timeout=10,
                 keepalive_interval=30):
        super(CustomSSHHook, self).__init__(
                 ssh_conn_id,
                 remote_host,
                 username,
                 password,
                 key_file,
                 port,
                 timeout,
                 keepalive_interval)
        # Use connection to override defaults
        self.gss_auth = False
        if self.ssh_conn_id is not None:
            conn = self.get_connection(self.ssh_conn_id)

            if conn.extra is not None:
                extra_options = conn.extra_dejson
                if "gss_auth" in extra_options \
                        and str(extra_options["gss_auth"]).lower() == 'true':
                    self.gss_auth = True

    def get_conn(self):
        """
                Opens a ssh connection to the remote host.

                :return paramiko.SSHClient object
                """

        self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        if self.no_host_key_check:
            # Default is RejectPolicy
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        if self.password and self.password.strip():
            client.connect(hostname=self.remote_host,
                           username=self.username,
                           password=self.password,
                           key_filename=self.key_file,
                           timeout=self.timeout,
                           compress=self.compress,
                           port=self.port,
                           sock=self.host_proxy)
        else:
            client.connect(hostname=self.remote_host,
                           username=self.username,
                           key_filename=self.key_file,
                           timeout=self.timeout,
                           compress=self.compress,
                           port=self.port,
                           sock=self.host_proxy,
                           gss_auth=self.gss_auth)

        if self.keepalive_interval:
            client.get_transport().set_keepalive(self.keepalive_interval)

        self.client = client
        return client



class CustomSshPlugin(AirflowPlugin):
    name = "ssh_plugins"
    DEFAULT_PLUGIN = "1.0"
    hooks = [CustomSSHHook]

以上可以在DAG中使用如下:

from airflow.hooks.ssh_plugins import CustomSSHHook

edge_node_hook = CustomSSHHook(ssh_conn_id="ssh_con_id",
                         remote_host=host_ip,
                         port=22,
                         timeout=100)
ssh_execution = SSHOperator(dag=dag, task_id='sample_task',
                                          ssh_hook=edge_node_hook,
                                          command='whoami ',
                                          do_xcom_push=False)

并且您必须在 ID 为 'ssh_conn_id' 的连接的附加字段中添加以下参数。

{"gss_auth":"true"}