如何将自定义组件添加到 Elyra 的可用气流运算符列表中?

How to add custom component to Elyra's list of available airflow operators?

尝试基于 KubernetesPodOperator 制作自己的组件。我能够定义组件并将其添加到组件列表中,但是当尝试 运行 它时,我得到:

Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.

和错误:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
    result = await result
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
    response = await PipelineProcessorManager.instance().process(pipeline)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
    res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
    pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
    target_ops = self._cc_pipeline(pipeline, pipeline_name)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
    raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators.  Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.

查看src代码后,我可以在processor_airflow.py中看到这些行:

 # This specifies the default airflow operators included with Elyra.  Any Airflow-based
    # custom connectors should create/extend the elyra configuration file to include
    # those fully-qualified operator/class names.
    available_airflow_operators = ListTrait(
        CUnicode(),
        ["airflow.operators.slack_operator.SlackAPIPostOperator",
         "airflow.operators.bash_operator.BashOperator",
         "airflow.operators.email_operator.EmailOperator",
         "airflow.operators.http_operator.SimpleHttpOperator",
         "airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
         "airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
        help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines.  These operators must
be fully qualified (i.e., prefixed with their package names).
       """,
    ).tag(config=True)

虽然我不确定这是否可以从客户端扩展。

available_airflow_operators 列表是 Elyra 中的 configurable trait。您必须将 KubernetesPodOperator 的 fully-qualified 包名称添加到此列表,以便它正确创建 DAG。

为此,请使用 jupyter elyra --generate-config 从命令行生成一个配置文件。打开创建的文件并添加以下行(如果您希望保持文件井井有条,可以将其添加到 PipelineProcessor(LoggingConfigurable) 标题下):

c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")

如果该字符串值不是上述值,请将该字符串值更改为适用于您的用例的正确包(确保它以所需运算符的 class 名称结尾)。如果需要添加多个包,也可以使用extend而不是append

编辑:here 是 link 相关文档