Airflow - 在 DockerOperator 中使用 run_id 作为卷名
Airflow - Use run_id as volume name in DockerOperator
我在Apache Airflow 中写了一个DockerOperator,我想给它一个卷。到目前为止,一切都很好。这是一个例子:
t = DockerOperator(
task_id='test',
image='testimage:latest',
command='python3 /code/test.py',
volumes=["/mnt/interim:/interim"],
xcom_push=True,
dag=dag,
)
我遇到的问题如下:
挂载目录的名称要灵活。因此,我想挂载一个名称中带有 run_id 的目录。
volumes=["/mnt/interim/" + "{{ run_id }}" + ":/interim"]
然而,Airflow 似乎无法解析卷中的“{{ run_id }}”,而只能解析 DockerOperator 的命令。
简而言之,我想获取 run_id 以便安装它。
请注意,使用气流变量(气流的环境变量)不会解决问题,因为如果任务并行运行,这个变量可能会被覆盖。
也许你们中有人已经知道可以执行此操作的高级 DockerOperator (CustomOperator)。
提前致谢:)
感谢 Johannes 提出问题。
您尝试实现的目标是可能的,但由于这不是一个非常常见的用例,因此默认情况下未启用。 template_fields
可迭代对象中的参数由 Airflow 模板化。 volumes
字段不在那里,因此没有被拾取。
最简单的方法是复制项目中的 docker_operator.py,然后将 volumes
字段添加到列表中:https://github.com/apache/airflow/blob/master/airflow/operators/docker_operator.py#L126:
template_fields = ('command', 'environment', 'container_name', 'volumes',)
你也可以开工单把这个合并上游,但我不确定有多少用户会模板化这个字段。希望这有帮助。
我在Apache Airflow 中写了一个DockerOperator,我想给它一个卷。到目前为止,一切都很好。这是一个例子:
t = DockerOperator(
task_id='test',
image='testimage:latest',
command='python3 /code/test.py',
volumes=["/mnt/interim:/interim"],
xcom_push=True,
dag=dag,
)
我遇到的问题如下:
挂载目录的名称要灵活。因此,我想挂载一个名称中带有 run_id 的目录。
volumes=["/mnt/interim/" + "{{ run_id }}" + ":/interim"]
然而,Airflow 似乎无法解析卷中的“{{ run_id }}”,而只能解析 DockerOperator 的命令。
简而言之,我想获取 run_id 以便安装它。
请注意,使用气流变量(气流的环境变量)不会解决问题,因为如果任务并行运行,这个变量可能会被覆盖。
也许你们中有人已经知道可以执行此操作的高级 DockerOperator (CustomOperator)。
提前致谢:)
感谢 Johannes 提出问题。
您尝试实现的目标是可能的,但由于这不是一个非常常见的用例,因此默认情况下未启用。 template_fields
可迭代对象中的参数由 Airflow 模板化。 volumes
字段不在那里,因此没有被拾取。
最简单的方法是复制项目中的 docker_operator.py,然后将 volumes
字段添加到列表中:https://github.com/apache/airflow/blob/master/airflow/operators/docker_operator.py#L126:
template_fields = ('command', 'environment', 'container_name', 'volumes',)
你也可以开工单把这个合并上游,但我不确定有多少用户会模板化这个字段。希望这有帮助。