使用 Google Cloud Composer 的 SFTP
SFTP with Google Cloud Composer
我需要通过 SFTP 通过 Cloud Composer 将文件上传到外部服务器。任务代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
import pysftp
import os
from airflow.contrib.hooks.ssh_hook import SSHHook
import subprocess
ssh_hook = SSHHook(ssh_conn_id="conn_id")
sftp_client = ssh_hook.get_conn().open_sftp()
return 0
etl_dag = DAG("dag_test",
start_date=datetime.now(tz=local_tz),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
在“conn_id”中,我使用了以下选项:{“no_host_key_check”:“true”},DAG 运行了几秒钟,失败并显示以下消息:
WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks\n[2022-02-10 10:01:59,358] {ssh_hook.py:171} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks\nTraceback (most recent call last):\n File "/tmp/venvur4zvddz/script.py", line 23, in <module>\n res = make_sftp(*args, **kwargs)\n File "/tmp/venvur4zvddz/script.py", line 19, in make_sftp\n sftp_client = ssh_hook.get_conn().open_sftp()\n File "/usr/local/lib/airflow/airflow/contrib/hooks/ssh_hook.py", line 194, in get_conn\n client.connect(**connect_kwargs)\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/client.py", line 412, in connect\n server_key = t.get_remote_server_key()\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/transport.py", line 834, in get_remote_server_key\n raise SSHException("No existing session")\nparamiko.ssh_exception.SSHException: No existing session\n'
我必须设置其他选项吗?谢谢!
使用密钥对身份验证配置 SSH 连接
要以用户名为“user_a”的用户身份通过 SSH 连接到主机,应为该用户生成 SSH 密钥对,并将 public 密钥添加到主机。以下是创建与具有写入权限的“jupyter”用户的 SSH 连接的步骤。
- 运行 在本地机器上执行以下命令生成所需的 SSH 密钥:
ssh-keygen -t rsa -f ~/.ssh/sftp-ssh-key -C user_a
“sftp-ssh-key” → public 和私钥对的名称(Public key: sftp-ssh-key.pub, Private key: sftp-ssh-key )
“user_a” → 我们尝试连接的虚拟机中的用户
chmod 400 ~/.ssh/sftp-ssh-key
现在,将 public 键 sftp-ssh-key.pub
的内容复制到主机系统的 ~/.ssh/authorized_keys
中。检查 authorized_keys
的必要权限并相应地使用 chmod
.
授予它们
我使用 Compute Engine VM 测试了设置。在 Compute Engine 控制台中,编辑 VM 设置以将生成的 SSH public 密钥的内容添加到实例元数据中。可以找到详细说明 here. If you are connecting to a Compute Engine VM, make sure that the instance has the appropriate firewall rule 以允许 SSH 连接。
将私钥上传到客户端机器。在这种情况下,客户端是 Airflow DAG,因此应该可以从 Composer/Airflow 环境访问密钥文件。要使密钥文件可访问,必须将其上传到与 Composer 环境关联的 GCS 存储桶。例如私钥上传到bucket中的data
文件夹,则key文件路径为/home/airflow/gcs/data/sftp-ssh-key
.
配置带密码验证的 SSH 连接
如果主机上没有配置密码验证,请按照以下步骤启用密码验证。
- 使用以下命令设置用户密码并输入两次新密码。
sudo passwd user_a
- 要启用 SSH 密码验证,您必须以 root 身份通过 SSH 连接到主机以编辑
sshd_config
文件。
/etc/ssh/sshd_config
- 然后,将
PasswordAuthentication no
行更改为 PasswordAuthentication yes
。进行更改后,以 root 身份通过 运行 以下命令重新启动 SSH 服务。
sudo service ssh restart
密码验证已配置完成。
正在创建连接并上传 DAG
1.1 带密钥认证的 Airflow 连接
使用以下配置在 Airflow 中创建连接或使用现有连接。
额外字段
Extra JSON 字典看起来像这样。在这里,我们已经将私钥文件上传到 Composer 环境的 GCS 存储桶中的 data
文件夹。
{
"key_file": "/home/airflow/gcs/data/sftp-ssh-key",
"conn_timeout": "30",
"look_for_keys": "false"
}
1.2 带密码验证的 Airflow 连接
如果主机配置为允许密码验证,这些是要在 Airflow 连接中进行的更改。
Extra
参数可以为空。
Password
参数是主机上user_a的用户密码。
任务日志显示密码验证成功。
INFO - Authentication (password) successful!
- 将 DAG 上传到 Composer 环境并触发 DAG。我在使用最新版本的
paramiko=2.9.2
库时遇到了密钥验证问题。我尝试降级 paramiko
但旧版本似乎不支持 OPENSSH 密钥。找到了替代方案 paramiko-ng
,其中验证问题已得到修复。在 PythonVirtualenvOperator
. 中将 Python 依赖项从 paramiko
更改为 paramiko-ng
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
from airflow.contrib.hooks.ssh_hook import SSHHook
ssh_hook = SSHHook(ssh_conn_id="sftp_connection")
sftp_client = ssh_hook.get_conn().open_sftp()
print("=================SFTP Connection Successful=================")
remote_host = "/home/sftp-folder/sample_sftp_file" # file path in the host system
local_host = "/home/airflow/gcs/data/sample_sftp_file" # file path in the client system
sftp_client.get(remote_host,local_host) # GET operation to copy file from host to client
sftp_client.close()
return 0
etl_dag = DAG("sftp_dag",
start_date=datetime.now(),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko-ng", "pysftp"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
结果
sample_sftp_file
已从主机系统复制到指定的 Composer 存储桶。
我需要通过 SFTP 通过 Cloud Composer 将文件上传到外部服务器。任务代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
import pysftp
import os
from airflow.contrib.hooks.ssh_hook import SSHHook
import subprocess
ssh_hook = SSHHook(ssh_conn_id="conn_id")
sftp_client = ssh_hook.get_conn().open_sftp()
return 0
etl_dag = DAG("dag_test",
start_date=datetime.now(tz=local_tz),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
在“conn_id”中,我使用了以下选项:{“no_host_key_check”:“true”},DAG 运行了几秒钟,失败并显示以下消息:
WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks\n[2022-02-10 10:01:59,358] {ssh_hook.py:171} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks\nTraceback (most recent call last):\n File "/tmp/venvur4zvddz/script.py", line 23, in <module>\n res = make_sftp(*args, **kwargs)\n File "/tmp/venvur4zvddz/script.py", line 19, in make_sftp\n sftp_client = ssh_hook.get_conn().open_sftp()\n File "/usr/local/lib/airflow/airflow/contrib/hooks/ssh_hook.py", line 194, in get_conn\n client.connect(**connect_kwargs)\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/client.py", line 412, in connect\n server_key = t.get_remote_server_key()\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/transport.py", line 834, in get_remote_server_key\n raise SSHException("No existing session")\nparamiko.ssh_exception.SSHException: No existing session\n'
我必须设置其他选项吗?谢谢!
使用密钥对身份验证配置 SSH 连接
要以用户名为“user_a”的用户身份通过 SSH 连接到主机,应为该用户生成 SSH 密钥对,并将 public 密钥添加到主机。以下是创建与具有写入权限的“jupyter”用户的 SSH 连接的步骤。
- 运行 在本地机器上执行以下命令生成所需的 SSH 密钥:
ssh-keygen -t rsa -f ~/.ssh/sftp-ssh-key -C user_a
“sftp-ssh-key” → public 和私钥对的名称(Public key: sftp-ssh-key.pub, Private key: sftp-ssh-key )
“user_a” → 我们尝试连接的虚拟机中的用户
chmod 400 ~/.ssh/sftp-ssh-key
现在,将 public 键
授予它们sftp-ssh-key.pub
的内容复制到主机系统的~/.ssh/authorized_keys
中。检查authorized_keys
的必要权限并相应地使用chmod
.我使用 Compute Engine VM 测试了设置。在 Compute Engine 控制台中,编辑 VM 设置以将生成的 SSH public 密钥的内容添加到实例元数据中。可以找到详细说明 here. If you are connecting to a Compute Engine VM, make sure that the instance has the appropriate firewall rule 以允许 SSH 连接。
将私钥上传到客户端机器。在这种情况下,客户端是 Airflow DAG,因此应该可以从 Composer/Airflow 环境访问密钥文件。要使密钥文件可访问,必须将其上传到与 Composer 环境关联的 GCS 存储桶。例如私钥上传到bucket中的
data
文件夹,则key文件路径为/home/airflow/gcs/data/sftp-ssh-key
.
配置带密码验证的 SSH 连接
如果主机上没有配置密码验证,请按照以下步骤启用密码验证。
- 使用以下命令设置用户密码并输入两次新密码。
sudo passwd user_a
- 要启用 SSH 密码验证,您必须以 root 身份通过 SSH 连接到主机以编辑
sshd_config
文件。
/etc/ssh/sshd_config
- 然后,将
PasswordAuthentication no
行更改为PasswordAuthentication yes
。进行更改后,以 root 身份通过 运行 以下命令重新启动 SSH 服务。
sudo service ssh restart
密码验证已配置完成。
正在创建连接并上传 DAG
1.1 带密钥认证的 Airflow 连接
使用以下配置在 Airflow 中创建连接或使用现有连接。
额外字段
Extra JSON 字典看起来像这样。在这里,我们已经将私钥文件上传到 Composer 环境的 GCS 存储桶中的 data
文件夹。
{
"key_file": "/home/airflow/gcs/data/sftp-ssh-key",
"conn_timeout": "30",
"look_for_keys": "false"
}
1.2 带密码验证的 Airflow 连接
如果主机配置为允许密码验证,这些是要在 Airflow 连接中进行的更改。
Extra
参数可以为空。
Password
参数是主机上user_a的用户密码。
任务日志显示密码验证成功。
INFO - Authentication (password) successful!
- 将 DAG 上传到 Composer 环境并触发 DAG。我在使用最新版本的
paramiko=2.9.2
库时遇到了密钥验证问题。我尝试降级paramiko
但旧版本似乎不支持 OPENSSH 密钥。找到了替代方案paramiko-ng
,其中验证问题已得到修复。在PythonVirtualenvOperator
. 中将 Python 依赖项从
paramiko
更改为 paramiko-ng
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
from airflow.contrib.hooks.ssh_hook import SSHHook
ssh_hook = SSHHook(ssh_conn_id="sftp_connection")
sftp_client = ssh_hook.get_conn().open_sftp()
print("=================SFTP Connection Successful=================")
remote_host = "/home/sftp-folder/sample_sftp_file" # file path in the host system
local_host = "/home/airflow/gcs/data/sample_sftp_file" # file path in the client system
sftp_client.get(remote_host,local_host) # GET operation to copy file from host to client
sftp_client.close()
return 0
etl_dag = DAG("sftp_dag",
start_date=datetime.now(),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko-ng", "pysftp"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
结果
sample_sftp_file
已从主机系统复制到指定的 Composer 存储桶。