KubernetesPodOperator - 运行 从 XCOM 中提取的整个命令

KubernetesPodOperator - run entire command pulled from XCOM

我正在研究 Airflow 1.10。

我在 KubernetesPodOperator 上使用 运行ning 命令时遇到问题,其中整个命令在 DAG 运行 时间期间进行评估。

我正在 DAG 运行time 中生成命令,因为一些命令的参数取决于用户传递的参数。

正如我从文档中读到的 KubernetesPodOperator 需要字符串列表或 jinja 模板列表:

    :param arguments: arguments of the entrypoint. (templated)
        The docker image's CMD is used if this is not provided.

我有生成命令并将其推送到 XCOM 和 KubernetesPodOperator 的 PythonOperator 在参数中,我传递由 PythonOperator 生成的命令。

from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator


def command_maker():
    import random # random is to illustrate that we don't know arguments value before runtime
    return f"my_command {random.randint(1, 10)} --option {random.randint(1, 4)}"

def create_tasks(dag):
    first = PythonOperator(
        task_id="generate_command",
        python_callable=command_maker,
        provide_context=True,
        dag=dag,
    )
    second = KubernetesPodOperator(
        namespace='some_namespace',
        image='some_image',
        name='execute_command',
        dag=dag,
        arguments=[f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="return_value")}}']
    )
    second.set_upstream(first)

不幸的是,KubernetesPodOperator 没有 运行 这个命令正确,因为他试图 运行 像这样的东西:

[my_command 4 --option 2]

有没有办法在 KubernetesPodOperator 运行time 评估这个列表 或者我是否被迫将所有 运行time 参数推入单独的 XCOM? 我想避免这样的解决方案,因为它需要在我的项目中进行大量更改。

         arguments=[
            "my_command",
            f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="first_argument")}}',
            "--option",
            f'{{ ti.xcom_pull(dag_id="{dag.dag_id}", task_ids="generate_command", key="second_argument")}}',
         ]

问题是 JINJA 模板 return 默认情况下模板是字符串。

然而,在最近的 Airflow 中(从 Airlfow 2.1.0 开始)您可以将模板呈现为原生 python 对象:

https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#rendering-fields-as-native-python-objects

通过在创建 DAG 时使用 render_template_as_native_obj=True 参数。

然后您需要按照 python 的 literal_eval 能够将其转换为 python 对象的方式格式化您的输出。在您的情况下,您必须使输出类似于:

[ 'my_command', '4', '--option', '2' ]

请注意,此参数将为所有模板 return 本机对象,因此如果它们 return 某些值 literal_eval understands - 它们也将被转换为本机类型(并且您可能有一些意想不到的副作用。

为了获得 Airflow 1.10 的工作解决方案,我不得不使用 BaseOperator.pre_execute 钩子。

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.lineage import prepare_lineage

class UnpackCommandKubernetesPodOperator(KubernetesPodOperator):
    @prepare_lineage
    def pre_execute(self, context):
        self.arguments = self.arguments[0].split(" ")