Airflow 2 conn URI,秘密管理器中 AWS 的额外字段 JSON
Airflow 2 conn URI, extra field JSON for AWS in secret manager
我在 Airflow 2 中将 AWS Secret Manager 设置为我的秘密后端。
我在秘密管理器(纯文本)中定义了一个 aws_default 连接:
aws:///extra?region_name=us-east-1&session_kwargs={"profile_name": "my_profile"}
当我用它调用钩子时 (AwsGlueCrawlerHook(aws_conn_id='aws_default')
),我得到以下错误:
Traceback (most recent call last):
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
error_file=args.error_file,
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/airflow/dags/reboots/operators/start_glue_crawler_operator.py", line 29, in execute
AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id).start_crawler(crawler_name=self.crawler_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 120, in start_crawler
crawler = self.glue_client.start_crawler(Name=crawler_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 48, in glue_client
return self.get_conn()
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 494, in get_conn
return self.conn
File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 476, in conn
return self.get_client_type(self.client_type, region_name=self.region_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 442, in get_client_type
session, endpoint_url = self._get_credentials(region_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 418, in _get_credentials
conn=connection_object, region_name=region_name, config=self.config
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 74, in create_session
self.basic_session = self._create_basic_session(session_kwargs=session_kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 100, in _create_basic_session
**session_kwargs,
TypeError: type object argument after ** must be a mapping, not str
我不知道如何形成我的 airflow conn URI 以便它最终被正确加载(又名作为字典,而不是字符串)。
我试图转义引号等。我就是想不通。如果我只是使用它不会出错:
aws:///extra?region_name=us-east-1
所以我知道问题出在我编写 session_kwargs 参数的方式上。
我知道我可以在 backend_kwargs 中将 full_url_mode 更改为 false,但此时我真的很好奇如何编写 conn URI。
嗯,在这里找到了答案:https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
from airflow.models.connection import Connection
extra = {
"config_kwargs": {
"proxies": {
"http": "http://user:pwd@proxy.net:3128",
"https": "http://user:pwd@proxy.net:3128"
}
}
}
c = Connection(
conn_id="some_conn",
conn_type="aws",
description="",
host="",
login="",
password="",
extra=json.dumps(extra),
)
print(c.get_uri())
它需要 URL 编码但不完全。无论如何,如果您有 JSON 需要编码,airflow 会为您提供这样做的工具。
我在 Airflow 2 中将 AWS Secret Manager 设置为我的秘密后端。
我在秘密管理器(纯文本)中定义了一个 aws_default 连接:
aws:///extra?region_name=us-east-1&session_kwargs={"profile_name": "my_profile"}
当我用它调用钩子时 (AwsGlueCrawlerHook(aws_conn_id='aws_default')
),我得到以下错误:
Traceback (most recent call last):
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
error_file=args.error_file,
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/airflow/dags/reboots/operators/start_glue_crawler_operator.py", line 29, in execute
AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id).start_crawler(crawler_name=self.crawler_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 120, in start_crawler
crawler = self.glue_client.start_crawler(Name=crawler_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 48, in glue_client
return self.get_conn()
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 494, in get_conn
return self.conn
File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 476, in conn
return self.get_client_type(self.client_type, region_name=self.region_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 442, in get_client_type
session, endpoint_url = self._get_credentials(region_name)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 418, in _get_credentials
conn=connection_object, region_name=region_name, config=self.config
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 74, in create_session
self.basic_session = self._create_basic_session(session_kwargs=session_kwargs)
File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 100, in _create_basic_session
**session_kwargs,
TypeError: type object argument after ** must be a mapping, not str
我不知道如何形成我的 airflow conn URI 以便它最终被正确加载(又名作为字典,而不是字符串)。
我试图转义引号等。我就是想不通。如果我只是使用它不会出错:
aws:///extra?region_name=us-east-1
所以我知道问题出在我编写 session_kwargs 参数的方式上。 我知道我可以在 backend_kwargs 中将 full_url_mode 更改为 false,但此时我真的很好奇如何编写 conn URI。
嗯,在这里找到了答案:https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
from airflow.models.connection import Connection
extra = {
"config_kwargs": {
"proxies": {
"http": "http://user:pwd@proxy.net:3128",
"https": "http://user:pwd@proxy.net:3128"
}
}
}
c = Connection(
conn_id="some_conn",
conn_type="aws",
description="",
host="",
login="",
password="",
extra=json.dumps(extra),
)
print(c.get_uri())
它需要 URL 编码但不完全。无论如何,如果您有 JSON 需要编码,airflow 会为您提供这样做的工具。