尝试查询 mssql 数据库时出现气流 Fernet_Key 问题
Airflow Fernet_Key issue when trying to query a mssql db
我是 Airflow 的新手。我已经通读了好几遍文档,翻阅了无数 S/O 问题和许多 运行dom 在线文章,但尚未解决此问题。我有一种感觉,我做错了一些非常简单的事情。
我有 Docker 用于 Windows,我拉取了 puckel/docker-airflow
图像和 运行 一个暴露端口的容器,这样我就可以从我的主机上访问 UI。我有另一个容器 运行 mcr.microsoft.com/mssql/server
,我在其中恢复了 WideWorldImporters 示例数据库。从 Airflow UI,我已经能够成功创建到这个数据库的连接,甚至可以从数据分析部分查询它。检查下面的图片:
Connection Creation
Successful Query to Connection
所以虽然这有效,但我的 dag 在第二个任务中失败了 sqlData
。这是代码:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess
最初的错误是:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
我注意到这与密码学有关,我继续 运行 pip install cryptography
和 pip install airflow[crytpo]
,两者都返回了完全相同的结果,告诉我要求已经已经很满意了。最后,我发现了一些说我只需要生成一个 fernet_key 的东西。我的 airflow.cfg 文件中的默认密钥是 fernet_key = $FERNET_KEY
。所以从容器中的 cli 我 运行:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
并得到了我用 $FERNET_KEY
替换的代码。我重新启动了容器并重新 运行 dag,现在我的错误是:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
初始加密文档扫描中的哪些与兼容性有关?
我现在不知所措,决定问这个问题,看看我在解决这个问题时是否有可能走错路。任何帮助将不胜感激,因为 Airflow 看起来很棒。
感谢@Tomasz 的一些辅助沟通,我终于让我的 DAG 开始工作了。他建议我尝试使用 docker-compose,它也在 puckel/docker-airflow github 存储库中列出。不过,我最终使用了 docker-compose-LocalExecutor.yml 文件而不是 Celery Executor。我还必须进行一些小的故障排除和更多配置。首先,我使用现有的 MSSQL 容器,其中包含示例数据库,并使用 docker commit mssql_container_name
将其转换为图像。我这样做的唯一原因是为了节省恢复备份样本数据库的时间;您可以随时将备份复制到容器中,并在以后根据需要恢复它们。然后我将我的新图像添加到现有的 docker-compose-LocalExecutor.yml 文件中,如下所示:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
mssql:
image: dw:latest
ports:
- "1433:1433"
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
#volumes:
#- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
请注意,dw 是我命名的基于 mssql 容器的新映像。接下来,我将文件重命名为 docker-compose.yml 这样我就可以轻松地 运行 docker-compose up
(不确定是否有直接指向不同 YAML 文件的命令)。一切就绪并 运行ning 后,我导航到 Airflow UI 并配置了我的连接。注意:因为您使用的是 docker-compose,您不需要知道其他容器的 IP 地址,因为它们使用我发现的 DNS 服务发现 。然后为了测试连接,我转到数据分析进行临时查询,但连接不存在。这是因为 puckel/docker-airflow 映像没有安装 pymssql。所以只需 bash 放入容器 docker exec -it airflow_webserver_container bash
并安装它 pip install pymssql --user
。退出容器并使用 docker-compose restart
重新启动所有服务。一分钟后,一切都准备好了 运行ning。我的连接显示在 Ad hoc Query 中,我可以成功 select 数据。最后,我打开了我的 DAG,调度程序接了它,一切都成功了!经过数周的谷歌搜索后,超级松了一口气。感谢@y2k-shubham 的帮助和对@Tomasz 的超级感谢,在他关于 r/datascience subreddit 上的 Airflow 的精彩和透彻 post 之后,我实际上最初联系了他。
我是 Airflow 的新手。我已经通读了好几遍文档,翻阅了无数 S/O 问题和许多 运行dom 在线文章,但尚未解决此问题。我有一种感觉,我做错了一些非常简单的事情。
我有 Docker 用于 Windows,我拉取了 puckel/docker-airflow
图像和 运行 一个暴露端口的容器,这样我就可以从我的主机上访问 UI。我有另一个容器 运行 mcr.microsoft.com/mssql/server
,我在其中恢复了 WideWorldImporters 示例数据库。从 Airflow UI,我已经能够成功创建到这个数据库的连接,甚至可以从数据分析部分查询它。检查下面的图片:
Connection Creation
Successful Query to Connection
所以虽然这有效,但我的 dag 在第二个任务中失败了 sqlData
。这是代码:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess
最初的错误是:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
我注意到这与密码学有关,我继续 运行 pip install cryptography
和 pip install airflow[crytpo]
,两者都返回了完全相同的结果,告诉我要求已经已经很满意了。最后,我发现了一些说我只需要生成一个 fernet_key 的东西。我的 airflow.cfg 文件中的默认密钥是 fernet_key = $FERNET_KEY
。所以从容器中的 cli 我 运行:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
并得到了我用 $FERNET_KEY
替换的代码。我重新启动了容器并重新 运行 dag,现在我的错误是:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
初始加密文档扫描中的哪些与兼容性有关?
我现在不知所措,决定问这个问题,看看我在解决这个问题时是否有可能走错路。任何帮助将不胜感激,因为 Airflow 看起来很棒。
感谢@Tomasz 的一些辅助沟通,我终于让我的 DAG 开始工作了。他建议我尝试使用 docker-compose,它也在 puckel/docker-airflow github 存储库中列出。不过,我最终使用了 docker-compose-LocalExecutor.yml 文件而不是 Celery Executor。我还必须进行一些小的故障排除和更多配置。首先,我使用现有的 MSSQL 容器,其中包含示例数据库,并使用 docker commit mssql_container_name
将其转换为图像。我这样做的唯一原因是为了节省恢复备份样本数据库的时间;您可以随时将备份复制到容器中,并在以后根据需要恢复它们。然后我将我的新图像添加到现有的 docker-compose-LocalExecutor.yml 文件中,如下所示:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
mssql:
image: dw:latest
ports:
- "1433:1433"
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
#volumes:
#- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
请注意,dw 是我命名的基于 mssql 容器的新映像。接下来,我将文件重命名为 docker-compose.yml 这样我就可以轻松地 运行 docker-compose up
(不确定是否有直接指向不同 YAML 文件的命令)。一切就绪并 运行ning 后,我导航到 Airflow UI 并配置了我的连接。注意:因为您使用的是 docker-compose,您不需要知道其他容器的 IP 地址,因为它们使用我发现的 DNS 服务发现 docker exec -it airflow_webserver_container bash
并安装它 pip install pymssql --user
。退出容器并使用 docker-compose restart
重新启动所有服务。一分钟后,一切都准备好了 运行ning。我的连接显示在 Ad hoc Query 中,我可以成功 select 数据。最后,我打开了我的 DAG,调度程序接了它,一切都成功了!经过数周的谷歌搜索后,超级松了一口气。感谢@y2k-shubham 的帮助和对@Tomasz 的超级感谢,在他关于 r/datascience subreddit 上的 Airflow 的精彩和透彻 post 之后,我实际上最初联系了他。