如何定义 STFP Operator 在 Airflow 上的操作?
How to define operations of an STFP Operator on Airflow?
class SFTPOperation(object):
PUT = 'put'
GET = 'get'
operation=SFTPOperation.GET,
NameError: name 'SFTPOperation' is not defined
我在这里定义了操作符,但我在网上找不到任何与操作相关的东西
class sftpplugin(AirflowPlugin):
name = "sftp_plugin"
operators = [SFTPOperator]
如有任何帮助,我们将不胜感激!
谢谢,
注意到 SFTP 运算符使用 ssh_hook 打开 sftp 传输通道,您应该需要提供 ssh_hook
或 ssh_conn_id
进行文件传输。首先,让我们看一个提供参数 ssh_conn_id
.
的例子
from airflow.providers.sftp.operators import sftp_operator
from airflow import DAG
import datetime
dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)
put_operation = SFTPOperator(
task_id="operation",
ssh_conn_id="ssh_default",
local_filepath="route_to_local_file",
remote_filepath="remote_route_to_copy",
operation="put",
dag=dag
)
get_operation = SFTPOperator(....,
operation = "get",
dag = dag
)
put_operation >> get_operation
请注意,dag 应该根据您的任务的需要进行安排,这里的示例考虑了从中午开始的每日安排。现在,如果您提供 SSHhook,则需要对上述代码进行以下更改
from airflow.contrib.hooks.ssh_hook import SSHHook
...
put_operation = SFTPOperator(
task_id="operation",
ssh_hook=SSHHook("Name_of_variable_defined"),
...
dag=dag
)
....
其中"Name_of_variable_defined"
是在Airflow界面的Admin -> Connections中创建的
class SFTPOperation(object):
PUT = 'put'
GET = 'get'
operation=SFTPOperation.GET,
NameError: name 'SFTPOperation' is not defined
我在这里定义了操作符,但我在网上找不到任何与操作相关的东西
class sftpplugin(AirflowPlugin):
name = "sftp_plugin"
operators = [SFTPOperator]
如有任何帮助,我们将不胜感激!
谢谢,
注意到 SFTP 运算符使用 ssh_hook 打开 sftp 传输通道,您应该需要提供 ssh_hook
或 ssh_conn_id
进行文件传输。首先,让我们看一个提供参数 ssh_conn_id
.
from airflow.providers.sftp.operators import sftp_operator
from airflow import DAG
import datetime
dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)
put_operation = SFTPOperator(
task_id="operation",
ssh_conn_id="ssh_default",
local_filepath="route_to_local_file",
remote_filepath="remote_route_to_copy",
operation="put",
dag=dag
)
get_operation = SFTPOperator(....,
operation = "get",
dag = dag
)
put_operation >> get_operation
请注意,dag 应该根据您的任务的需要进行安排,这里的示例考虑了从中午开始的每日安排。现在,如果您提供 SSHhook,则需要对上述代码进行以下更改
from airflow.contrib.hooks.ssh_hook import SSHHook
...
put_operation = SFTPOperator(
task_id="operation",
ssh_hook=SSHHook("Name_of_variable_defined"),
...
dag=dag
)
....
其中"Name_of_variable_defined"
是在Airflow界面的Admin -> Connections中创建的