Apache AIrflow KubernetesExecutor 和 KubernetesPodOperator:xcom 推送不工作
Apache AIrflow KubernetesExecutor and KubernetesPodOperator: xcom pushes not working
已将 Apache Airflow 实例放置在 kubernetes 集群中:webserver、scheduler 和 postgresql。使用基于 bitnami 的自定义 helm 图表并进行一些更改。
Airflow 正在与 KubernetesExecutor 一起工作。我所有的 DAG 都是 PythonOperator 和 KubernetesPodOperator(前 DockerOperator - 在 k8s 之前)。 Xcom 推送仅与 PythonOperator 一起正常工作,但与 KubernetesPodOperator 一起我在执行结束时遇到错误(所有 dags 都受到影响):
[2019-12-06 15:12:40,116] {logging_mixin.py:112} INFO - [2019-12-06 15:12:40,116] {pod_launcher.py:217} INFO - Running command... cat /airflow/xcom/return.json
[2019-12-06 15:12:40,201] {logging_mixin.py:112} INFO - [2019-12-06 15:12:40,201] {pod_launcher.py:224} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
所以好像没有创建这个文件。
我也尝试过重写 post_execute
方法来在那里创建这个文件并 json.dump 结果,但它没有帮助 - 这个错误仍然存在。
如有任何解决方法的建议,我们将不胜感激。
更新:我也已将此代码复制粘贴到我的 DAG https://github.com/apache/airflow/blob/36f3bfb0619cc78698280f6ec3bc985f84e58343/tests/contrib/minikube/test_kubernetes_pod_operator.py#L315,但即使使用 apache/airflow 代码进行单元测试,我仍然遇到此错误。
另外不得不提的是我的kubernetes版本是1.11.10。 Minikube 1.5.2
将数据库 (PostgreSQL) 依赖项和版本更改为较新版本并使其正常运行。
默认情况下,KubernetesPodOperator
的 xcom_push
参数是 True
,这会导致 AirFlow 尝试从已执行的容器中读取 /airflow/xcom/return.json
。只需将其更改为 False
:
KubernetesPodOperator(
....
xcom_push=False
)
已将 Apache Airflow 实例放置在 kubernetes 集群中:webserver、scheduler 和 postgresql。使用基于 bitnami 的自定义 helm 图表并进行一些更改。
Airflow 正在与 KubernetesExecutor 一起工作。我所有的 DAG 都是 PythonOperator 和 KubernetesPodOperator(前 DockerOperator - 在 k8s 之前)。 Xcom 推送仅与 PythonOperator 一起正常工作,但与 KubernetesPodOperator 一起我在执行结束时遇到错误(所有 dags 都受到影响):
[2019-12-06 15:12:40,116] {logging_mixin.py:112} INFO - [2019-12-06 15:12:40,116] {pod_launcher.py:217} INFO - Running command... cat /airflow/xcom/return.json
[2019-12-06 15:12:40,201] {logging_mixin.py:112} INFO - [2019-12-06 15:12:40,201] {pod_launcher.py:224} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
所以好像没有创建这个文件。
我也尝试过重写 post_execute
方法来在那里创建这个文件并 json.dump 结果,但它没有帮助 - 这个错误仍然存在。
如有任何解决方法的建议,我们将不胜感激。
更新:我也已将此代码复制粘贴到我的 DAG https://github.com/apache/airflow/blob/36f3bfb0619cc78698280f6ec3bc985f84e58343/tests/contrib/minikube/test_kubernetes_pod_operator.py#L315,但即使使用 apache/airflow 代码进行单元测试,我仍然遇到此错误。
另外不得不提的是我的kubernetes版本是1.11.10。 Minikube 1.5.2
将数据库 (PostgreSQL) 依赖项和版本更改为较新版本并使其正常运行。
默认情况下,KubernetesPodOperator
的 xcom_push
参数是 True
,这会导致 AirFlow 尝试从已执行的容器中读取 /airflow/xcom/return.json
。只需将其更改为 False
:
KubernetesPodOperator(
....
xcom_push=False
)