在气流 DAG 中出现错误,>> 不受支持的操作数类型:'list' 和 'list'。任务的顺序和并行执行
Getting Error in airflow DAG unsupported operand type(s) for >>: 'list' and 'list'. Sequential and Parallel execution of tasks
我是 Apache airflow 和 DAG 的新手。 DAG 中共有 6 个任务(task1、task2、task3、task4、task5、task6)。但是在 运行 DAG 时,我们收到以下错误。
>> 的 DAG 操作数类型不受支持:'list' 和 'list'
下面是我的 DAG 代码。请帮忙。我是气流新手。
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 14))
#################### CREATE TASK #####################################
task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
#################### ORDER OF OPERATORS ###########################
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6
Airflow 任务依赖项无法处理 [list]>>[list]。解决此问题的最简单方法是在多行中指定您的依赖项:
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5]
task_3 >> [task_4, task_5]
[task_4 , task_5 ] >> task_6
您想要的任务相关性是什么?您是要 运行 task_4
仅在 task_2
之后还是在 task_2
和 task_3
之后
根据该答案,使用以下选项之一:
(如果 task_4 应该 运行 在 task_2
和 task_3
完成后使用这个)
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5] >> task_6
task_3 >> [task_4, task_5]
或
(如果 task_2
完成后 task_4 应该 运行 并且 task_3
完成后 task_5
应该 运行 使用这个)
task_1 >> [task_2 , task_3]
task_2 >> task_4
task_3 >> task_5
[task_4, task_5] >> task_6
提示,您不需要进行以下操作:
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
您可以将dag
参数传递给您的任务本身,例如:
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
dag=dag,
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
或使用 DAG 作为上下文管理器,如 https://airflow.apache.org/docs/stable/concepts.html#context-manager and Point (1) in https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f
中所述
同样的问题,运行版本:1.10.14+composer GCP:
代码:
i = 0 # iterator
while i < len(LIST_OF_OPS): # list of operators which I have aggregated beforehand
LIST_OF_OPS[i] >> LIST_OF_OPS[i+1:i+8] # one task queues up seven more
i += 7 # in order not to start all of them in parallel, I increase the iterator to one less than the number of tasks started just now; a chain is formed.
这个解决方案允许我动态生成任务,并迭代它们。当前用于具有 280 个任务的工作流。
我是 Apache airflow 和 DAG 的新手。 DAG 中共有 6 个任务(task1、task2、task3、task4、task5、task6)。但是在 运行 DAG 时,我们收到以下错误。
>> 的 DAG 操作数类型不受支持:'list' 和 'list'
下面是我的 DAG 代码。请帮忙。我是气流新手。
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 14))
#################### CREATE TASK #####################################
task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
#################### ORDER OF OPERATORS ###########################
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6
Airflow 任务依赖项无法处理 [list]>>[list]。解决此问题的最简单方法是在多行中指定您的依赖项:
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5]
task_3 >> [task_4, task_5]
[task_4 , task_5 ] >> task_6
您想要的任务相关性是什么?您是要 运行 task_4
仅在 task_2
之后还是在 task_2
和 task_3
根据该答案,使用以下选项之一:
(如果 task_4 应该 运行 在 task_2
和 task_3
完成后使用这个)
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5] >> task_6
task_3 >> [task_4, task_5]
或
(如果 task_2
完成后 task_4 应该 运行 并且 task_3
完成后 task_5
应该 运行 使用这个)
task_1 >> [task_2 , task_3]
task_2 >> task_4
task_3 >> task_5
[task_4, task_5] >> task_6
提示,您不需要进行以下操作:
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
您可以将dag
参数传递给您的任务本身,例如:
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
dag=dag,
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
或使用 DAG 作为上下文管理器,如 https://airflow.apache.org/docs/stable/concepts.html#context-manager and Point (1) in https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f
中所述同样的问题,运行版本:1.10.14+composer GCP:
代码:
i = 0 # iterator
while i < len(LIST_OF_OPS): # list of operators which I have aggregated beforehand
LIST_OF_OPS[i] >> LIST_OF_OPS[i+1:i+8] # one task queues up seven more
i += 7 # in order not to start all of them in parallel, I increase the iterator to one less than the number of tasks started just now; a chain is formed.
这个解决方案允许我动态生成任务,并迭代它们。当前用于具有 280 个任务的工作流。