Airflow 1.9.0 ExternalTaskSensor retry_delay=30 yields TypeError: can't pickle _thread.RLock objects
Airflow 1.9.0 ExternalTaskSensor retry_delay=30 yields TypeError: can't pickle _thread.RLock objects
正如标题所说;在 Airflow 1.9.0 中,如果您将 retry_delay=30(或任何其他数字)参数与 ExternalTaskSensor 一起使用,DAG 将 运行 就好了,直到您想清除气流 GUI 中的任务实例-> 它将 return 出现以下错误:"TypeError: can't pickle _thread.RLock objects"(以及一条不错的 Oops 消息)
但是如果你使用 retry_delay=timedelta(seconds=30) 清除任务实例工作正常。
如果我查看 models.py 方法,深层复制应该没问题,所以我觉得这很奇怪。我是不是遗漏了什么,或者这是一个错误?
您可以在下面找到一个最小的 DAG 示例。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': [],
'email_on_failure': False,
'email_on_retry': False
}
test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval,
catchup=False, max_active_runs=1)
ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=False,
timeout=10, retries=0, poke_interval=1, retry_delay=30, external_dag_id="dependent_dag_id",
external_task_id="dependent_task_id")
dummy_task = DummyOperator(task_id="collection_task", dag=test_dag)
dummy_task << ets
编辑:并根据要求堆栈跟踪:
Ooops.
____/ ( ( ) ) \___
/( ( ( ) _ )) ) )\
(( ( )( ) ) ( ) )
((/ ( _( ) ( _) ) ( () ) )
( ( ( (_) (( ( ) .((_ ) . )_
( ( ) ( ( ) ) ) . ) ( )
( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
( ( ( ) ( ) ( )) ) _)( ) ) )
( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
(( ( )( ( _ ) _) _(_ ( (_ )
(_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
((__) \||lll|l||/// \_))
( /(/ ( ) ) )\ )
( ( ( ( | | ) ) )\ )
( /(| / ( )) ) ) )) )
( ( ((((_(|)_))))) )
( ||\(|(|)|/|| )
( |(||(||)|||| )
( //|/l|||)|\ \ )
(/ / // /|//||||\ \ \ \ _)
-------------------------------------------------------------------------------
Node: jb-VirtualBox
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/utils.py", line 262, in wrapper
return f(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/utils.py", line 309, in wrapper
return f(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/views.py", line 989, in clear
include_upstream=upstream)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 3527, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib/python3.6/copy.py", line 161, in deepcopy
y = copier(memo)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 3512, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 161, in deepcopy
y = copier(memo)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 2437, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 215, in _deepcopy_list
append(deepcopy(a, memo))
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread.RLock objects
再看这个问题,文档上明确说retry_delay应该是timedelta。因此,幸运的是,如果您为 retry_delay.
输入一个整数而不是时间增量,DAG 就可以工作
在models.py中,BaseOperator:
:param retry_delay: delay between retries
:type retry_delay: timedelta
正如标题所说;在 Airflow 1.9.0 中,如果您将 retry_delay=30(或任何其他数字)参数与 ExternalTaskSensor 一起使用,DAG 将 运行 就好了,直到您想清除气流 GUI 中的任务实例-> 它将 return 出现以下错误:"TypeError: can't pickle _thread.RLock objects"(以及一条不错的 Oops 消息) 但是如果你使用 retry_delay=timedelta(seconds=30) 清除任务实例工作正常。
如果我查看 models.py 方法,深层复制应该没问题,所以我觉得这很奇怪。我是不是遗漏了什么,或者这是一个错误?
您可以在下面找到一个最小的 DAG 示例。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': [],
'email_on_failure': False,
'email_on_retry': False
}
test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval,
catchup=False, max_active_runs=1)
ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=False,
timeout=10, retries=0, poke_interval=1, retry_delay=30, external_dag_id="dependent_dag_id",
external_task_id="dependent_task_id")
dummy_task = DummyOperator(task_id="collection_task", dag=test_dag)
dummy_task << ets
编辑:并根据要求堆栈跟踪:
Ooops.
____/ ( ( ) ) \___
/( ( ( ) _ )) ) )\
(( ( )( ) ) ( ) )
((/ ( _( ) ( _) ) ( () ) )
( ( ( (_) (( ( ) .((_ ) . )_
( ( ) ( ( ) ) ) . ) ( )
( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
( ( ( ) ( ) ( )) ) _)( ) ) )
( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
(( ( )( ( _ ) _) _(_ ( (_ )
(_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
((__) \||lll|l||/// \_))
( /(/ ( ) ) )\ )
( ( ( ( | | ) ) )\ )
( /(| / ( )) ) ) )) )
( ( ((((_(|)_))))) )
( ||\(|(|)|/|| )
( |(||(||)|||| )
( //|/l|||)|\ \ )
(/ / // /|//||||\ \ \ \ _)
-------------------------------------------------------------------------------
Node: jb-VirtualBox
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/utils.py", line 262, in wrapper
return f(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/utils.py", line 309, in wrapper
return f(*args, **kwargs)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/www/views.py", line 989, in clear
include_upstream=upstream)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 3527, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib/python3.6/copy.py", line 161, in deepcopy
y = copier(memo)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 3512, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 161, in deepcopy
y = copier(memo)
File "/home/jb/Documents/p3_cdc_data_flow/lib/python3.6/site-packages/airflow/models.py", line 2437, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 215, in _deepcopy_list
append(deepcopy(a, memo))
File "/usr/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib/python3.6/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread.RLock objects
再看这个问题,文档上明确说retry_delay应该是timedelta。因此,幸运的是,如果您为 retry_delay.
输入一个整数而不是时间增量,DAG 就可以工作在models.py中,BaseOperator:
:param retry_delay: delay between retries
:type retry_delay: timedelta