使用 Google Cloud Composer 的 SFTP

SFTP with Google Cloud Composer

我需要通过 SFTP 通过 Cloud Composer 将文件上传到外部服务器。任务代码如下:


    from airflow import DAG
    from airflow.operators.python_operator import PythonVirtualenvOperator
    from airflow.operators.dummy_operator import DummyOperator
    from datetime import datetime, timedelta
    
    
    def make_sftp():
        import paramiko
        import pysftp
        import os
        from airflow.contrib.hooks.ssh_hook import SSHHook
        import subprocess
    
    
        ssh_hook = SSHHook(ssh_conn_id="conn_id")
        sftp_client = ssh_hook.get_conn().open_sftp()
    
        return 0
    
    
    etl_dag = DAG("dag_test",
                  start_date=datetime.now(tz=local_tz),
                  schedule_interval=None,
                  default_args={
                      "owner": "airflow",
                      "depends_on_past": False,
                      "email_on_failure": False,
                      "email_on_retry": False,
                      "retries": 5,
                      "retry_delay": timedelta(minutes=5)})
    
    sftp = PythonVirtualenvOperator(task_id="sftp",
                                    python_callable=make_sftp,
                                    requirements=["sshtunnel", "paramiko"],
    
                                    dag=etl_dag)
    
    start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
    
    start_pipeline >> sftp

在“conn_id”中,我使用了以下选项:{“no_host_key_check”:“true”},DAG 运行了几秒钟,失败并显示以下消息:

WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks\n[2022-02-10 10:01:59,358] {ssh_hook.py:171} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks\nTraceback (most recent call last):\n  File "/tmp/venvur4zvddz/script.py", line 23, in <module>\n    res = make_sftp(*args, **kwargs)\n  File "/tmp/venvur4zvddz/script.py", line 19, in make_sftp\n    sftp_client = ssh_hook.get_conn().open_sftp()\n  File "/usr/local/lib/airflow/airflow/contrib/hooks/ssh_hook.py", line 194, in get_conn\n    client.connect(**connect_kwargs)\n  File "/opt/python3.6/lib/python3.6/site-packages/paramiko/client.py", line 412, in connect\n    server_key = t.get_remote_server_key()\n  File "/opt/python3.6/lib/python3.6/site-packages/paramiko/transport.py", line 834, in get_remote_server_key\n    raise SSHException("No existing session")\nparamiko.ssh_exception.SSHException: No existing session\n'

我必须设置其他选项吗?谢谢!

使用密钥对身份验证配置 SSH 连接

要以用户名为“user_a”的用户身份通过​​ SSH 连接到主机,应为该用户生成 SSH 密钥对,并将 public 密钥添加到主机。以下是创建与具有写入权限的“jupyter”用户的 SSH 连接的步骤。

  1. 运行 在本地机器上执行以下命令生成所需的 SSH 密钥:
ssh-keygen -t rsa -f ~/.ssh/sftp-ssh-key -C user_a

“sftp-ssh-key” → public 和私钥对的名称(Public key: sftp-ssh-key.pub, Private key: sftp-ssh-key )

“user_a” → 我们尝试连接的虚拟机中的用户

chmod 400 ~/.ssh/sftp-ssh-key
  1. 现在,将 public 键 sftp-ssh-key.pub 的内容复制到主机系统的 ~/.ssh/authorized_keys 中。检查 authorized_keys 的必要权限并相应地使用 chmod.

    授予它们

    我使用 Compute Engine VM 测试了设置。在 Compute Engine 控制台中,编辑 VM 设置以将生成的 SSH public 密钥的内容添加到实例元数据中。可以找到详细说明 here. If you are connecting to a Compute Engine VM, make sure that the instance has the appropriate firewall rule 以允许 SSH 连接。

  2. 将私钥上传到客户端机器。在这种情况下,客户端是 Airflow DAG,因此应该可以从 Composer/Airflow 环境访问密钥文件。要使密钥文件可访问,必须将其上传到与 Composer 环境关联的 GCS 存储桶。例如私钥上传到bucket中的data文件夹,则key文件路径为/home/airflow/gcs/data/sftp-ssh-key.


配置带密码验证的 SSH 连接

如果主机上没有配置密码验证,请按照以下步骤启用密码验证

  1. 使用以下命令设置用户密码并输入两次新密码。
sudo passwd user_a
  1. 要启用 SSH 密码验证,您必须以 root 身份通过 SSH 连接到主机以编辑 sshd_config 文件。
/etc/ssh/sshd_config
  1. 然后,将 PasswordAuthentication no 行更改为 PasswordAuthentication yes。进行更改后,以 root 身份通过 运行 以下命令重新启动 SSH 服务。
sudo service ssh restart

密码验证已配置完成。


正在创建连接并上传 DAG

1.1 带密钥认证的 Airflow 连接

使用以下配置在 Airflow 中创建连接或使用现有连接。

额外字段

Extra JSON 字典看起来像这样。在这里,我们已经将私钥文件上传到 Composer 环境的 GCS 存储桶中的 data 文件夹。

{
   "key_file": "/home/airflow/gcs/data/sftp-ssh-key",
   "conn_timeout": "30",
   "look_for_keys": "false"
}

1.2 带密码验证的 Airflow 连接

如果主机配置为允许密码验证,这些是要在 Airflow 连接中进行的更改。 Extra参数可以为空。

Password参数是主机上user_a的用户密码。

任务日志显示密码验证成功。

INFO - Authentication (password) successful!
  1. 将 DAG 上传到 Composer 环境并触发 DAG。我在使用最新版本的 paramiko=2.9.2 库时遇到了密钥验证问题。我尝试降级 paramiko 但旧版本似乎不支持 OPENSSH 密钥。找到了替代方案 paramiko-ng,其中验证问题已得到修复。在 PythonVirtualenvOperator.
  2. 中将 Python 依赖项从 paramiko 更改为 paramiko-ng
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta


def make_sftp():
    import paramiko
    from airflow.contrib.hooks.ssh_hook import SSHHook

    ssh_hook = SSHHook(ssh_conn_id="sftp_connection")
    sftp_client = ssh_hook.get_conn().open_sftp()
    
    print("=================SFTP Connection Successful=================")
    
    remote_host = "/home/sftp-folder/sample_sftp_file" # file path in the host system
    local_host = "/home/airflow/gcs/data/sample_sftp_file" # file path in the client system
    
    sftp_client.get(remote_host,local_host) # GET operation to copy file from host to client
    
    sftp_client.close()
    return 0


etl_dag = DAG("sftp_dag",
                start_date=datetime.now(),
                schedule_interval=None,
                default_args={
                    "owner": "airflow",
                    "depends_on_past": False,
                    "email_on_failure": False,
                    "email_on_retry": False,
                    "retries": 5,
                    "retry_delay": timedelta(minutes=5)})

sftp = PythonVirtualenvOperator(task_id="sftp",
                                python_callable=make_sftp,
                                requirements=["sshtunnel", "paramiko-ng", "pysftp"],
                                dag=etl_dag)

start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)

start_pipeline >> sftp

结果

sample_sftp_file 已从主机系统复制到指定的 Composer 存储桶。