Airflow s3 连接使用 UI
Airflow s3 connection using UI
我一直在尝试使用 Airflow 来安排 DAG。
DAG 之一包含从 s3 存储桶加载数据的任务。
出于上述目的,我需要设置 s3 连接。但是气流提供的 UI 并不那么直观 (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections)。是否有人成功设置了 s3 连接(如果有),您是否遵循了任何最佳实践?
谢谢。
编辑:此答案将您的密钥存储在纯文本中,这可能存在安全风险,因此不推荐使用。最好的方法是将访问密钥和秘密密钥放在 login/password 字段中,如以下其他答案所述。
结束编辑
很难找到参考资料,但经过一番挖掘后我能够让它发挥作用。
TLDR
创建具有以下属性的新连接:
连接 ID: my_conn_S3
连接类型: S3
额外:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
长版本,正在设置 UI 连接:
- 在 Airflow UI 上,转到管理 > 连接
- 创建具有以下属性的新连接:
- 连接 ID:
my_conn_S3
- 连接类型:
S3
- 额外:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
- 将所有其他字段(主机、模式、登录)留空。
要使用此连接,您可以在下面找到一个简单的 S3 传感器测试。这个测试的想法是设置一个传感器来监视 S3 中的文件(T1 任务),一旦满足以下条件,它就会触发 bash 命令(T2 任务)。
测试
- 在 运行 启用 DAG 之前,确保您有一个名为 'S3-Bucket-To-Watch'.
的 S3 存储桶
- 将下面 s3_dag_test.py 添加到 airflow dags 文件夹 (~/airflow/dags)
- 开始
airflow webserver
.
- 转到 Airflow UI (http://localhost:8383/)
- 开始
airflow scheduler
.
- 在主 DAG 视图上打开 's3_dag_test' DAG。
- Select 's3_dag_test' 显示 dag 详细信息。
- 在图表视图中,您应该能够看到它的当前状态。
- 'check_s3_for_file_in_s3' 任务应该处于活动状态并且 运行ning.
- 现在,将名为 'file-to-watch-1' 的文件添加到您的 'S3-Bucket-To-Watch'。
- 第一个任务应该已经完成,第二个应该开始并完成。
dag定义中的schedule_interval设置为'@once',方便调试
再次运行,保持原样,删除存储桶中的文件,然后通过选择第一个任务(在图表视图中)并选择'Clear'所有[=113=来重试],'Future','Upstream','Downstream' .... activity。这应该会再次启动 DAG。
让我知道进展如何。
s3_dag_test.py;
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
主要参考资料:
如果您担心在 UI 中暴露凭据,另一种方法是在 UI 的 Extra 参数中传递凭据文件位置。只有功能用户具有文件的读取权限。
它看起来像下面
Extra: {
"profile": "<profile_name>",
"s3_config_file": "/home/<functional_user>/creds/s3_credentials",
"s3_config_format": "aws" }
文件“/home/<functional_user>/creds/s3_credentials
”包含以下条目
[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
假设气流托管在 EC2 服务器上。
只需按照其他答案创建连接,但除了应保留为 S3 的连接类型外,将配置中的所有内容留空
S3hook 将默认为 boto,这将默认为您 运行 airflow 所在的 EC2 服务器的角色。假设此角色有权访问 S3,您的任务将能够访问存储桶。
这是一种比使用和存储凭据更安全的方法。
对于新版本,更改上面示例中的 python 代码。
s3_conn_id='my_conn_S3'
到
aws_conn_id='my_conn_s3'
对于中国的aws,它不支持airflow==1.8.0
需要更新到 1.9.0
但气流 1.9.0 将名称更改为 apache-airflow==1.9.0
另一个对我有用的选项是将访问密钥作为 "login" 并将秘密密钥作为 "password":
Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>
将所有其他字段留空。
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}
注意:登录名和密码字段留空。
我们在几个版本前将其添加到我们的文档中:
http://airflow.apache.org/docs/stable/howto/connection/aws.html
AWS 连接和 S3 连接之间没有区别。
这里接受的答案在 extra/JSON 中有密钥和秘密,虽然它仍然有效(从 1.10.10 开始),但不再推荐它,因为它在 [=] 中以纯文本显示秘密20=].
我一直在尝试使用 Airflow 来安排 DAG。 DAG 之一包含从 s3 存储桶加载数据的任务。
出于上述目的,我需要设置 s3 连接。但是气流提供的 UI 并不那么直观 (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections)。是否有人成功设置了 s3 连接(如果有),您是否遵循了任何最佳实践?
谢谢。
编辑:此答案将您的密钥存储在纯文本中,这可能存在安全风险,因此不推荐使用。最好的方法是将访问密钥和秘密密钥放在 login/password 字段中,如以下其他答案所述。 结束编辑
很难找到参考资料,但经过一番挖掘后我能够让它发挥作用。
TLDR
创建具有以下属性的新连接:
连接 ID: my_conn_S3
连接类型: S3
额外:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
长版本,正在设置 UI 连接:
- 在 Airflow UI 上,转到管理 > 连接
- 创建具有以下属性的新连接:
- 连接 ID:
my_conn_S3
- 连接类型:
S3
- 额外:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
- 将所有其他字段(主机、模式、登录)留空。
要使用此连接,您可以在下面找到一个简单的 S3 传感器测试。这个测试的想法是设置一个传感器来监视 S3 中的文件(T1 任务),一旦满足以下条件,它就会触发 bash 命令(T2 任务)。
测试
- 在 运行 启用 DAG 之前,确保您有一个名为 'S3-Bucket-To-Watch'. 的 S3 存储桶
- 将下面 s3_dag_test.py 添加到 airflow dags 文件夹 (~/airflow/dags)
- 开始
airflow webserver
. - 转到 Airflow UI (http://localhost:8383/)
- 开始
airflow scheduler
. - 在主 DAG 视图上打开 's3_dag_test' DAG。
- Select 's3_dag_test' 显示 dag 详细信息。
- 在图表视图中,您应该能够看到它的当前状态。
- 'check_s3_for_file_in_s3' 任务应该处于活动状态并且 运行ning.
- 现在,将名为 'file-to-watch-1' 的文件添加到您的 'S3-Bucket-To-Watch'。
- 第一个任务应该已经完成,第二个应该开始并完成。
dag定义中的schedule_interval设置为'@once',方便调试
再次运行,保持原样,删除存储桶中的文件,然后通过选择第一个任务(在图表视图中)并选择'Clear'所有[=113=来重试],'Future','Upstream','Downstream' .... activity。这应该会再次启动 DAG。
让我知道进展如何。
s3_dag_test.py;
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
主要参考资料:
如果您担心在 UI 中暴露凭据,另一种方法是在 UI 的 Extra 参数中传递凭据文件位置。只有功能用户具有文件的读取权限。 它看起来像下面
Extra: {
"profile": "<profile_name>",
"s3_config_file": "/home/<functional_user>/creds/s3_credentials",
"s3_config_format": "aws" }
文件“/home/<functional_user>/creds/s3_credentials
”包含以下条目
[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
假设气流托管在 EC2 服务器上。
只需按照其他答案创建连接,但除了应保留为 S3 的连接类型外,将配置中的所有内容留空
S3hook 将默认为 boto,这将默认为您 运行 airflow 所在的 EC2 服务器的角色。假设此角色有权访问 S3,您的任务将能够访问存储桶。
这是一种比使用和存储凭据更安全的方法。
对于新版本,更改上面示例中的 python 代码。
s3_conn_id='my_conn_S3'
到
aws_conn_id='my_conn_s3'
对于中国的aws,它不支持airflow==1.8.0 需要更新到 1.9.0 但气流 1.9.0 将名称更改为 apache-airflow==1.9.0
另一个对我有用的选项是将访问密钥作为 "login" 并将秘密密钥作为 "password":
Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>
将所有其他字段留空。
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}
注意:登录名和密码字段留空。
我们在几个版本前将其添加到我们的文档中:
http://airflow.apache.org/docs/stable/howto/connection/aws.html
AWS 连接和 S3 连接之间没有区别。
这里接受的答案在 extra/JSON 中有密钥和秘密,虽然它仍然有效(从 1.10.10 开始),但不再推荐它,因为它在 [=] 中以纯文本显示秘密20=].