Airflow 动态任务创建流设置不起作用
Airflow Dynamic Task Creation Stream Setting Not Working
我有一个复杂的 DAG,它基本上为六个不同的来源重复它的流六次。所以我一直在使用 for 循环来动态创建我的流,如下所示(小示例):
sources = ['source_1', 'source_2', 'source_3', 'source_4', 'source_5', 'source_6']
for source in sources:
source_task_1 = PythonOperator(
task_id=source + '_create_emr',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_arl_source_emr_creation,
op_args=[source])
source_task_2 = BashOperator(
task_id=source + '_starting_step_1',
retries=10,
bash_command='echo "Starting step 1 for ' + source + '"',
dag=dag)
source_task_2.set_upstream(source_task_1)
所有任务都在 DAG 中成功创建,因为我可以在 Airflow 上看到它们 UI 但奇怪的是它只在循环中第一次出现时链接流中的任务(source_1).
所有其他任务都没有上游或下游。我不明白这是怎么可能的,因为循环中的第一次出现是不是应该都起作用?
这是我的实际代码(这是一个非常大的 DAG,所以我只显示任务而不是我在内部使用的 python 可调用函数任务...):
def create_emr_step_3_subdag(main_dag, subdag_id, source):
subdag = DAG('{0}.{1}'.format(main_dag.dag_id, subdag_id), default_args=args)
source_run_emr_step_3 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_3',
dag=subdag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_3,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_3_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_3_waiter',
dag=subdag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_3_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_3_waiter.set_upstream(source_run_emr_step_3)
return subdag
class DatalakeDigitalPlatformArlWorkflowSource:
sourceShortName = None # source_1, source_2, source_3, source_4, source_5, source_6
sourceFullName = None # SOURCE_1, SOURCE_2, SOURCE_3, SOURCE_4, SOURCE_5, SOURCE_6
def getSourceShortName(self):
return self.sourceShortName
def setSourceShortName(self, sourceShortName):
self.sourceShortName = sourceShortName
def getSourceFulltName(self):
return self.sourceFullName
def setSourceFullName(self, sourceFullName):
self.sourceFullName = sourceFullName
source_1 = DatalakeDigitalPlatformArlWorkflowSource()
source_1.setSourceShortName("source_1")
source_1.setSourceFullName("SOURCE_1")
source_2 = DatalakeDigitalPlatformArlWorkflowSource()
source_2.setSourceShortName("source_2")
source_2.setSourceFullName("HZN")
source_3 = DatalakeDigitalPlatformArlWorkflowSource()
source_3.setSourceShortName("source_3")
source_3.setSourceFullName("SOURCE_3")
source_4 = DatalakeDigitalPlatformArlWorkflowSource()
source_4.setSourceShortName("source_4")
source_4.setSourceFullName("SOURCE_4")
source_5 = DatalakeDigitalPlatformArlWorkflowSource()
source_5.setSourceShortName("source_5")
source_5.setSourceFullName("PP")
source_6 = DatalakeDigitalPlatformArlWorkflowSource()
source_6.setSourceShortName("source_6")
source_6.setSourceFullName("SOURCE_6")
sources = [source_1, source_2, source_3, source_4, source_5, source_6]
for source in sources:
source_create_emr_task_id = source.sourceFullName + '_create_emr'
source_create_emr = PythonOperator(
task_id=source_create_emr_task_id,
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_blah_source_emr_creation,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_starting_step_1
source_starting_step_1 = BashOperator(
task_id=source.sourceFullName + '_starting_step_1',
retries=10,
bash_command='echo "Starting step 1 for ' + source.sourceShortName + '"',
dag=dag)
# Get source Batch ID
source_get_batch_id = PythonOperator(
task_id=source.sourceFullName + '_get_batch_id',
retries=10,
dag=dag,
python_callable=get_batch_id,
op_args=[airflow_home + '/resources/batch-id-inputs/batchid_input.json', source.sourceFullName])
# source_licappts
source_sensor_licappts = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_licappts',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_licappts_(.*).txt',
poke_interval=3,
execution_timeout=timedelta(hours=23),
dag=dag)
source_process_licappts = PythonOperator(
task_id=source.sourceFullName + '_process_licappts',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_licappts_(.*).txt', 'get' + source.sourceFullName + 'BatchId'])
# source_agents
source_sensor_agents = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_agents',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_agents_(.*).txt',
poke_interval=3,
dag=dag)
source_process_agents = PythonOperator(
task_id=source.sourceFullName + '_process_agents',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_agents_*.txt', 'get' + source.sourceFullName + 'BatchId'])
# source_agentpolicy
source_sensor_agentpolicy = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_agentpolicy',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_agentpolicy_(.*).txt',
poke_interval=3,
dag=dag)
source_process_agentpolicy = PythonOperator(
task_id=source.sourceFullName + '_process_agentpolicy',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_agentpolicy_*.txt', 'get' + source.sourceFullName + 'BatchId'])
# source_finished_step_1
source_finished_step_1 = BashOperator(
task_id=source.sourceFullName + '_finished_step_1',
retries=10,
bash_command='echo "Finished step 1 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_2
source_starting_step_2 = BashOperator(
task_id=source.sourceFullName + '_source_starting_step_2',
retries=10,
bash_command='echo "Starting step 2 for ' + source.sourceShortName + '"',
dag=dag)
source_run_emr_step_2 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_2',
dag=dag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_2,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_2_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_2_waiter',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_2_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_elastic_search_check
source_elastic_search_check = PythonOperator(
task_id=source.sourceFullName + '_elastic_search_check',
retries=10,
dag=dag,
python_callable=execute_get_advisor_batch_stage_status,
op_args=['get' + source.sourceFullName + 'BatchId', source.sourceFullName])
# source_finished_step_2
source_finished_step_2 = BashOperator(
task_id=source.sourceFullName + '_finished_step_2',
retries=10,
bash_command='echo "Finished step 2 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_3
source_starting_step_3 = BashOperator(
task_id=source.sourceFullName + '_starting_step_3',
retries=10,
bash_command='echo "Starting step 3 for ' + source.sourceShortName + '"',
dag=dag)
source_emr_step_3_subdag_task_id = source.sourceFullName + '_emr_step_3_subdag'
source_emr_step_3_subdag = SubDagOperator(
task_id=source_emr_step_3_subdag_task_id,
dag=dag,
retries=10,
pool='entitymatching_task_pool',
subdag=create_emr_step_3_subdag(dag, source_emr_step_3_subdag_task_id, source)
)
# source_finished_step_3
source_finished_step_3 = BashOperator(
task_id=source.sourceFullName + '_finished_step_3',
retries=10,
bash_command='echo "Finished step 3 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_4
source_starting_step_4 = BashOperator(
task_id=source.sourceFullName + '_starting_step_4',
retries=10,
bash_command='echo "Starting step 4 for ' + source.sourceShortName + '"',
dag=dag)
source_run_emr_step_4 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_4',
dag=dag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_4,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_4_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_4_waiter',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_4_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_finished_step_4
source_finished_step_4 = BashOperator(
task_id=source.sourceFullName + '_finished_step_4',
retries=10,
bash_command='echo "Finished step 4 for ' + source.sourceShortName + '"',
dag=dag)
source_emr_termination = PythonOperator(
task_id=source.sourceFullName + '_emr_termination',
dag=dag,
provide_context=True,
retries=10,
retry_delay=timedelta(minutes=5),
python_callable=execute_emr_termination,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_successful
source_successful = BashOperator(
task_id=source.sourceFullName + '_successful',
retries=10,
bash_command='sudo aws sns publish blah blah blah',
dag=dag)
# finished_foo_bar_blah_workflow
finished_foo_bar_blah_workflow = BashOperator(
task_id='finished_foo_bar_blah_workflow',
bash_command='echo "Finished foo_bar_blah_workflow"',
dag=dag)
### Stream ###
# Create EMR Cluster
source_create_emr.set_upstream(starting_foo_bar_blah_workflow)
# Step 1
source_starting_step_1.set_upstream(starting_foo_bar_blah_workflow)
source_get_batch_id.set_upstream(source_starting_step_1)
source_sensor_licappts.set_upstream(source_get_batch_id)
source_process_licappts.set_upstream(source_sensor_licappts)
source_sensor_agents.set_upstream(source_get_batch_id)
source_process_agents.set_upstream(source_sensor_agents)
source_sensor_agentpolicy.set_upstream(source_get_batch_id)
source_process_agentpolicy.set_upstream(source_sensor_agentpolicy)
source_finished_step_1.set_upstream(source_process_licappts)
source_finished_step_1.set_upstream(source_process_agents)
source_finished_step_1.set_upstream(source_process_agentpolicy)
# Step 2
source_starting_step_2.set_upstream(source_finished_step_1)
source_starting_step_2.set_upstream(source_create_emr) # Don't run EMR steps until the EMR is created
source_run_emr_step_2.set_upstream(source_starting_step_2)
source_run_emr_step_2_waiter.set_upstream(source_run_emr_step_2)
source_elastic_search_check.set_upstream(source_run_emr_step_2_waiter)
source_finished_step_2.set_upstream(source_elastic_search_check)
# Step 3
source_starting_step_3.set_upstream(source_finished_step_2)
source_emr_step_3_subdag.set_upstream(source_starting_step_3)
source_finished_step_3.set_upstream(source_emr_step_3_subdag)
# Step 4
source_starting_step_4.set_upstream(source_finished_step_3)
source_run_emr_step_4.set_upstream(source_starting_step_4)
source_run_emr_step_4_waiter.set_upstream(source_run_emr_step_4)
source_finished_step_4.set_upstream(source_run_emr_step_4_waiter)
# Terminate EMR Cluster
source_emr_termination.set_upstream(source_finished_step_4)
source_successful.set_upstream(source_emr_termination)
finished_foo_bar_blah_workflow.set_upstream(source_successful)
如您所见,流式传输无法正常工作
在我最近修改文件之前,它工作得很好,可以在这里看到
我刚刚对我的代码进行了大量重构,当我重新加载它时,我看到了这个错误。我不确定我做了什么,但我做了很多查找 + 全部替换来重命名,我想知道我是否在那个过程中搞砸了,也许我只是没有看到代码中的错误。但是让我认为这不是问题的原因是,如果那是问题那么为什么我的第一个来源会在它的流中正确链接?
是否有可能我在单个 DAG 中可以执行的任务数量达到了某种限制?
我想我发现了你的错误:
首先,为了排除这不是气流错误,我创建了一个小型 DAG,它为 7 个源创建了 25 个任务并设置了上游,一切正常。
所以我拿了你的代码,试了一下,我遇到了你看到的完全相同的问题。
现在我继续,注释掉你设置上游的所有步骤,然后一步一步地把它们放回去,一切正常,直到最后一行:
finished_foo_bar_blah_workflow.set_upstream(source_successful)
所以我查看了任务 finished_foo_bar_blah_workflow
,据我所知,这个任务只需要创建一次,而不是为每个源创建一次。所以我把代码
# finished_foo_bar_blah_workflow
finished_foo_bar_blah_workflow = BashOperator(
task_id='finished_foo_bar_blah_workflow',
bash_command='echo "Finished foo_bar_blah_workflow"',
dag=dag)
在 for source in sources:
行上方,瞧,一切正常。
编辑
因此,我查看了上游和下游列表中应该直接位于 finished_foo_bar_blah_workflow
上游的任务,而 finished_foo_bar_blah_workflow
在 upstream_list 中只有一个任务(在我移动代码之前)创建该任务,然后它正确包含 7 个任务),所有应该直接在该任务上游的任务,在它们的 downstream_list 中引用它并且它们的 upstream_list 包含应该在那里的任务也。因此,这可能是关于多次创建具有相同 task_id 的任务的错误。
我有一个复杂的 DAG,它基本上为六个不同的来源重复它的流六次。所以我一直在使用 for 循环来动态创建我的流,如下所示(小示例):
sources = ['source_1', 'source_2', 'source_3', 'source_4', 'source_5', 'source_6']
for source in sources:
source_task_1 = PythonOperator(
task_id=source + '_create_emr',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_arl_source_emr_creation,
op_args=[source])
source_task_2 = BashOperator(
task_id=source + '_starting_step_1',
retries=10,
bash_command='echo "Starting step 1 for ' + source + '"',
dag=dag)
source_task_2.set_upstream(source_task_1)
所有任务都在 DAG 中成功创建,因为我可以在 Airflow 上看到它们 UI 但奇怪的是它只在循环中第一次出现时链接流中的任务(source_1).
所有其他任务都没有上游或下游。我不明白这是怎么可能的,因为循环中的第一次出现是不是应该都起作用?
这是我的实际代码(这是一个非常大的 DAG,所以我只显示任务而不是我在内部使用的 python 可调用函数任务...):
def create_emr_step_3_subdag(main_dag, subdag_id, source):
subdag = DAG('{0}.{1}'.format(main_dag.dag_id, subdag_id), default_args=args)
source_run_emr_step_3 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_3',
dag=subdag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_3,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_3_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_3_waiter',
dag=subdag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_3_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_3_waiter.set_upstream(source_run_emr_step_3)
return subdag
class DatalakeDigitalPlatformArlWorkflowSource:
sourceShortName = None # source_1, source_2, source_3, source_4, source_5, source_6
sourceFullName = None # SOURCE_1, SOURCE_2, SOURCE_3, SOURCE_4, SOURCE_5, SOURCE_6
def getSourceShortName(self):
return self.sourceShortName
def setSourceShortName(self, sourceShortName):
self.sourceShortName = sourceShortName
def getSourceFulltName(self):
return self.sourceFullName
def setSourceFullName(self, sourceFullName):
self.sourceFullName = sourceFullName
source_1 = DatalakeDigitalPlatformArlWorkflowSource()
source_1.setSourceShortName("source_1")
source_1.setSourceFullName("SOURCE_1")
source_2 = DatalakeDigitalPlatformArlWorkflowSource()
source_2.setSourceShortName("source_2")
source_2.setSourceFullName("HZN")
source_3 = DatalakeDigitalPlatformArlWorkflowSource()
source_3.setSourceShortName("source_3")
source_3.setSourceFullName("SOURCE_3")
source_4 = DatalakeDigitalPlatformArlWorkflowSource()
source_4.setSourceShortName("source_4")
source_4.setSourceFullName("SOURCE_4")
source_5 = DatalakeDigitalPlatformArlWorkflowSource()
source_5.setSourceShortName("source_5")
source_5.setSourceFullName("PP")
source_6 = DatalakeDigitalPlatformArlWorkflowSource()
source_6.setSourceShortName("source_6")
source_6.setSourceFullName("SOURCE_6")
sources = [source_1, source_2, source_3, source_4, source_5, source_6]
for source in sources:
source_create_emr_task_id = source.sourceFullName + '_create_emr'
source_create_emr = PythonOperator(
task_id=source_create_emr_task_id,
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_blah_source_emr_creation,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_starting_step_1
source_starting_step_1 = BashOperator(
task_id=source.sourceFullName + '_starting_step_1',
retries=10,
bash_command='echo "Starting step 1 for ' + source.sourceShortName + '"',
dag=dag)
# Get source Batch ID
source_get_batch_id = PythonOperator(
task_id=source.sourceFullName + '_get_batch_id',
retries=10,
dag=dag,
python_callable=get_batch_id,
op_args=[airflow_home + '/resources/batch-id-inputs/batchid_input.json', source.sourceFullName])
# source_licappts
source_sensor_licappts = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_licappts',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_licappts_(.*).txt',
poke_interval=3,
execution_timeout=timedelta(hours=23),
dag=dag)
source_process_licappts = PythonOperator(
task_id=source.sourceFullName + '_process_licappts',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_licappts_(.*).txt', 'get' + source.sourceFullName + 'BatchId'])
# source_agents
source_sensor_agents = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_agents',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_agents_(.*).txt',
poke_interval=3,
dag=dag)
source_process_agents = PythonOperator(
task_id=source.sourceFullName + '_process_agents',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_agents_*.txt', 'get' + source.sourceFullName + 'BatchId'])
# source_agentpolicy
source_sensor_agentpolicy = OmegaFileSensor(
task_id=source.sourceFullName + '_sensor_agentpolicy',
retries=10,
filepath=airflow_home + '/foo/data/bar/blah/test/data',
filepattern=source.sourceShortName + '_agentpolicy_(.*).txt',
poke_interval=3,
dag=dag)
source_process_agentpolicy = PythonOperator(
task_id=source.sourceFullName + '_process_agentpolicy',
retries=10,
dag=dag,
python_callable=execute_d_landing_import,
op_args=[source.sourceShortName + '_agentpolicy_*.txt', 'get' + source.sourceFullName + 'BatchId'])
# source_finished_step_1
source_finished_step_1 = BashOperator(
task_id=source.sourceFullName + '_finished_step_1',
retries=10,
bash_command='echo "Finished step 1 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_2
source_starting_step_2 = BashOperator(
task_id=source.sourceFullName + '_source_starting_step_2',
retries=10,
bash_command='echo "Starting step 2 for ' + source.sourceShortName + '"',
dag=dag)
source_run_emr_step_2 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_2',
dag=dag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_2,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_2_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_2_waiter',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_2_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_elastic_search_check
source_elastic_search_check = PythonOperator(
task_id=source.sourceFullName + '_elastic_search_check',
retries=10,
dag=dag,
python_callable=execute_get_advisor_batch_stage_status,
op_args=['get' + source.sourceFullName + 'BatchId', source.sourceFullName])
# source_finished_step_2
source_finished_step_2 = BashOperator(
task_id=source.sourceFullName + '_finished_step_2',
retries=10,
bash_command='echo "Finished step 2 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_3
source_starting_step_3 = BashOperator(
task_id=source.sourceFullName + '_starting_step_3',
retries=10,
bash_command='echo "Starting step 3 for ' + source.sourceShortName + '"',
dag=dag)
source_emr_step_3_subdag_task_id = source.sourceFullName + '_emr_step_3_subdag'
source_emr_step_3_subdag = SubDagOperator(
task_id=source_emr_step_3_subdag_task_id,
dag=dag,
retries=10,
pool='entitymatching_task_pool',
subdag=create_emr_step_3_subdag(dag, source_emr_step_3_subdag_task_id, source)
)
# source_finished_step_3
source_finished_step_3 = BashOperator(
task_id=source.sourceFullName + '_finished_step_3',
retries=10,
bash_command='echo "Finished step 3 for ' + source.sourceShortName + '"',
dag=dag)
# source_starting_step_4
source_starting_step_4 = BashOperator(
task_id=source.sourceFullName + '_starting_step_4',
retries=10,
bash_command='echo "Starting step 4 for ' + source.sourceShortName + '"',
dag=dag)
source_run_emr_step_4 = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_4',
dag=dag,
provide_context=True,
retries=0,
python_callable=execute_emr_step_4,
op_args=[source_create_emr_task_id, source.sourceFullName])
source_run_emr_step_4_waiter = PythonOperator(
task_id=source.sourceFullName + '_run_emr_step_4_waiter',
dag=dag,
provide_context=True,
retries=10,
python_callable=execute_emr_step_4_waiter,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_finished_step_4
source_finished_step_4 = BashOperator(
task_id=source.sourceFullName + '_finished_step_4',
retries=10,
bash_command='echo "Finished step 4 for ' + source.sourceShortName + '"',
dag=dag)
source_emr_termination = PythonOperator(
task_id=source.sourceFullName + '_emr_termination',
dag=dag,
provide_context=True,
retries=10,
retry_delay=timedelta(minutes=5),
python_callable=execute_emr_termination,
op_args=[source_create_emr_task_id, source.sourceFullName])
# source_successful
source_successful = BashOperator(
task_id=source.sourceFullName + '_successful',
retries=10,
bash_command='sudo aws sns publish blah blah blah',
dag=dag)
# finished_foo_bar_blah_workflow
finished_foo_bar_blah_workflow = BashOperator(
task_id='finished_foo_bar_blah_workflow',
bash_command='echo "Finished foo_bar_blah_workflow"',
dag=dag)
### Stream ###
# Create EMR Cluster
source_create_emr.set_upstream(starting_foo_bar_blah_workflow)
# Step 1
source_starting_step_1.set_upstream(starting_foo_bar_blah_workflow)
source_get_batch_id.set_upstream(source_starting_step_1)
source_sensor_licappts.set_upstream(source_get_batch_id)
source_process_licappts.set_upstream(source_sensor_licappts)
source_sensor_agents.set_upstream(source_get_batch_id)
source_process_agents.set_upstream(source_sensor_agents)
source_sensor_agentpolicy.set_upstream(source_get_batch_id)
source_process_agentpolicy.set_upstream(source_sensor_agentpolicy)
source_finished_step_1.set_upstream(source_process_licappts)
source_finished_step_1.set_upstream(source_process_agents)
source_finished_step_1.set_upstream(source_process_agentpolicy)
# Step 2
source_starting_step_2.set_upstream(source_finished_step_1)
source_starting_step_2.set_upstream(source_create_emr) # Don't run EMR steps until the EMR is created
source_run_emr_step_2.set_upstream(source_starting_step_2)
source_run_emr_step_2_waiter.set_upstream(source_run_emr_step_2)
source_elastic_search_check.set_upstream(source_run_emr_step_2_waiter)
source_finished_step_2.set_upstream(source_elastic_search_check)
# Step 3
source_starting_step_3.set_upstream(source_finished_step_2)
source_emr_step_3_subdag.set_upstream(source_starting_step_3)
source_finished_step_3.set_upstream(source_emr_step_3_subdag)
# Step 4
source_starting_step_4.set_upstream(source_finished_step_3)
source_run_emr_step_4.set_upstream(source_starting_step_4)
source_run_emr_step_4_waiter.set_upstream(source_run_emr_step_4)
source_finished_step_4.set_upstream(source_run_emr_step_4_waiter)
# Terminate EMR Cluster
source_emr_termination.set_upstream(source_finished_step_4)
source_successful.set_upstream(source_emr_termination)
finished_foo_bar_blah_workflow.set_upstream(source_successful)
如您所见,流式传输无法正常工作
在我最近修改文件之前,它工作得很好,可以在这里看到
我刚刚对我的代码进行了大量重构,当我重新加载它时,我看到了这个错误。我不确定我做了什么,但我做了很多查找 + 全部替换来重命名,我想知道我是否在那个过程中搞砸了,也许我只是没有看到代码中的错误。但是让我认为这不是问题的原因是,如果那是问题那么为什么我的第一个来源会在它的流中正确链接?
是否有可能我在单个 DAG 中可以执行的任务数量达到了某种限制?
我想我发现了你的错误:
首先,为了排除这不是气流错误,我创建了一个小型 DAG,它为 7 个源创建了 25 个任务并设置了上游,一切正常。
所以我拿了你的代码,试了一下,我遇到了你看到的完全相同的问题。
现在我继续,注释掉你设置上游的所有步骤,然后一步一步地把它们放回去,一切正常,直到最后一行:
finished_foo_bar_blah_workflow.set_upstream(source_successful)
所以我查看了任务 finished_foo_bar_blah_workflow
,据我所知,这个任务只需要创建一次,而不是为每个源创建一次。所以我把代码
# finished_foo_bar_blah_workflow
finished_foo_bar_blah_workflow = BashOperator(
task_id='finished_foo_bar_blah_workflow',
bash_command='echo "Finished foo_bar_blah_workflow"',
dag=dag)
在 for source in sources:
行上方,瞧,一切正常。
编辑
因此,我查看了上游和下游列表中应该直接位于 finished_foo_bar_blah_workflow
上游的任务,而 finished_foo_bar_blah_workflow
在 upstream_list 中只有一个任务(在我移动代码之前)创建该任务,然后它正确包含 7 个任务),所有应该直接在该任务上游的任务,在它们的 downstream_list 中引用它并且它们的 upstream_list 包含应该在那里的任务也。因此,这可能是关于多次创建具有相同 task_id 的任务的错误。