气流:XCOM 插入期间的 Pickle 深度递归

Airflow: Pickle Depth Recursion during XCOM Inseert

您好,我正在 运行使用 PythonOperator 执行任务。看起来任务实际上能够 运行 就好了,返回值是我所期望的(它是 API 调用的大 XML 输出)。但是,我收到 ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison。我的 python 可调用函数正在返回一个值,所以我假设有一个 XCOM 推送并且它正在尝试序列化输出以供后续操作员摄取。但我不确定具体如何解决,因为我没有看到配置 a) 增加 Pickle 序列化程序的递归深度(建议 here)或 2)XCOM 推送期间的任何错误处理

我的完整轨迹如下

INFO - Subtask: [2017-11-08 14:00:14,545] {models.py:1342} INFO - Executing <Task(PythonOperator): test_task_xml> on 2017-11-07 00:00:00
INFO - Subtask: [2017-11-08 14:00:31,817] {python_operator.py:81} INFO - Done. Returned value was: <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>
INFO - Subtask: [2017-11-08 14:00:31,839] {models.py:1417} ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison [SQL: 'INSERT INTO xcom (key, value, timestamp, execution_date, task_id, dag_id) VALUES (%(key)s, %(value)s, now(), %(execution_date)s, %(task_id)s, %(dag_id)s) RETURNING xcom.id'] [parameters: [{'dag_id': 'test_dag', 'key': 'return_value', 'value': <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>, 'task_id': 'test_task_xml', 'execution_date': datetime.datetime(2017, 11, 7, 0, 0)}]]
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context
INFO - Subtask:     context = constructor(dialect, self, conn, *args)
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in _init_compiled
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in <genexpr>
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 1516, in process
INFO - Subtask:     value = dumps(value, protocol)
INFO - Subtask:     dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 274, in dump
[INFO - Subtask:     pik.dump(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 409, in dump
INFO - Subtask:     self.save(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
INFO - Subtask:     StockPickler.save_dict(pickler, obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 821, in save_dict
INFO - Subtask:     self._batch_setitems(obj.items())
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 847, in _batch_setitems
INFO - Subtask:     save(v)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 781, in save_list
INFO - Subtask:     self._batch_appends(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 805, in _batch_appends
INFO - Subtask:     save(x)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict

发现数据库的 BLOBBINARY LARGE OBJECT 设置了酸洗 Python 个对象的限制。要解决这个问题,您可以

  • 尝试使用 Fileflow
  • 将文件转储到临时文件夹并通过 XCOM 功能推送文件路径
  • 在单个任务中处理整个过程并通过 XCOM 推送某些值