如何将自定义组件添加到 Elyra 的可用气流运算符列表中?
How to add custom component to Elyra's list of available airflow operators?
尝试基于 KubernetesPodOperator 制作自己的组件。我能够定义组件并将其添加到组件列表中,但是当尝试 运行 它时,我得到:
Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
和错误:
Traceback (most recent call last):
File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
result = await result
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
response = await PipelineProcessorManager.instance().process(pipeline)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
target_ops = self._cc_pipeline(pipeline, pipeline_name)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
查看src代码后,我可以在processor_airflow.py中看到这些行:
# This specifies the default airflow operators included with Elyra. Any Airflow-based
# custom connectors should create/extend the elyra configuration file to include
# those fully-qualified operator/class names.
available_airflow_operators = ListTrait(
CUnicode(),
["airflow.operators.slack_operator.SlackAPIPostOperator",
"airflow.operators.bash_operator.BashOperator",
"airflow.operators.email_operator.EmailOperator",
"airflow.operators.http_operator.SimpleHttpOperator",
"airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
"airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines. These operators must
be fully qualified (i.e., prefixed with their package names).
""",
).tag(config=True)
虽然我不确定这是否可以从客户端扩展。
available_airflow_operators 列表是 Elyra 中的 configurable trait。您必须将 KubernetesPodOperator
的 fully-qualified 包名称添加到此列表,以便它正确创建 DAG。
为此,请使用 jupyter elyra --generate-config
从命令行生成一个配置文件。打开创建的文件并添加以下行(如果您希望保持文件井井有条,可以将其添加到 PipelineProcessor(LoggingConfigurable)
标题下):
c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")
如果该字符串值不是上述值,请将该字符串值更改为适用于您的用例的正确包(确保它以所需运算符的 class 名称结尾)。如果需要添加多个包,也可以使用extend
而不是append
。
编辑:here 是 link 相关文档
尝试基于 KubernetesPodOperator 制作自己的组件。我能够定义组件并将其添加到组件列表中,但是当尝试 运行 它时,我得到:
Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
和错误:
Traceback (most recent call last):
File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
result = await result
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
response = await PipelineProcessorManager.instance().process(pipeline)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
target_ops = self._cc_pipeline(pipeline, pipeline_name)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
查看src代码后,我可以在processor_airflow.py中看到这些行:
# This specifies the default airflow operators included with Elyra. Any Airflow-based
# custom connectors should create/extend the elyra configuration file to include
# those fully-qualified operator/class names.
available_airflow_operators = ListTrait(
CUnicode(),
["airflow.operators.slack_operator.SlackAPIPostOperator",
"airflow.operators.bash_operator.BashOperator",
"airflow.operators.email_operator.EmailOperator",
"airflow.operators.http_operator.SimpleHttpOperator",
"airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
"airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines. These operators must
be fully qualified (i.e., prefixed with their package names).
""",
).tag(config=True)
虽然我不确定这是否可以从客户端扩展。
available_airflow_operators 列表是 Elyra 中的 configurable trait。您必须将 KubernetesPodOperator
的 fully-qualified 包名称添加到此列表,以便它正确创建 DAG。
为此,请使用 jupyter elyra --generate-config
从命令行生成一个配置文件。打开创建的文件并添加以下行(如果您希望保持文件井井有条,可以将其添加到 PipelineProcessor(LoggingConfigurable)
标题下):
c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")
如果该字符串值不是上述值,请将该字符串值更改为适用于您的用例的正确包(确保它以所需运算符的 class 名称结尾)。如果需要添加多个包,也可以使用extend
而不是append
。
编辑:here 是 link 相关文档