Dask 客户端 + read_sql_table: distributed.protocol.core - 严重 - 序列化失败
Dask client + read_sql_table: distributed.protocol.core - CRITICAL - Failed to Serialize
尝试使用 dask.distributed Client
运行 read_sql_table
,其中 table 是一个查询,因为我不想获取整个 table .示例如下。
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
sql = select([schema.datetime, # DateTime format
schema.id]) \ # UUID
.select_from(schema) \ # schema class (see below)
.where(schema.datetime.between(start_dt, end_dt))
table = aliased(sql, name='table')
response = dd.read_sql_table(table=table, uri='uri', schema='schema', index_col='datetime')
对于 print(response)
,我得到了预期的输出:
Dask DataFrame Structure:
swing_id
npartitions=1
2020-08-03 07:16:59.590 object
2020-08-29 23:34:44.980 ...
Dask Name: from-delayed, 2 tasks
然而,当我开始像 print(response.head())
一样使用 dask.dataframe
时,我得到一个 Failed to Serialize
错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\tcp.py", line 223, in write
frames = await to_frames(
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 54, in to_frames
return _to_frames()
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File "./project-name/src/points_bulk_scripts/extract_features.py", line 24, in <module>
print(swings.head())
File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1006, in head
return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1039, in _head
result = result.compute()
File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1986, in gather
return self.sync(
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 832, in sync
return sync(
File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 340, in sync
raise exc.with_traceback(tb)
File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 324, in f
result[0] = yield future
File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1852, in _gather
raise exc
concurrent.futures._base.CancelledError: ('head-1-5-from-delayed-97944bbb0c6f5129ccb48a1f7c36d62b', 0)
如果我不创建 client
最奇怪的部分,那么数据帧将被计算更正:
import dask.dataframe as dd
response = dd.read_sql_table(table=<SQLAlchemy query>, uri='uri', schema='schema', index_col='index')
print(response.head())
此外,如果我阅读了全部内容 table,而无需查询,它适用于客户端:
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
response = dd.read_sql_table(table=<SQLAlchemy table>, uri='uri', schema='schema', index_col='index')
print(response.head())
附加信息
版本:
dask 2.30.0 py_0
dask-core 2.30.0 py_0
dask-labextension 3.0.0 py_0
python 3.8.5 h5fd99cc_1
client.get_versions(check=True)
:
{
'scheduler': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
},
'workers': {
'tcp://127.0.0.1:57813': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
}
},
'client': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
}
}
schema
class
from sqlalchemy import Column, String, Integer, TIMESTAMP, Boolean
from sqlalchemy.ext.declarative import declarative_base
class Schema(declarative_base()):
__tablename__ = "table_name"
datetime = Column(TIMESTAMP, primary_key=True)
id = Column(String, primary_key=True)
我相信这是您查询的这一部分:
.select_from(schema)
其中 schema
引用真实的数据库连接。
您应该改为使用字符串表、模式和列来引用完全抽象的表达式。例如,这来自测试
from sqlalchemy import sql
query = sql.select([sql.column("number"), sql.column("name")]).select_from(
sql.table("test")
)
(我觉得
尝试使用 dask.distributed Client
运行 read_sql_table
,其中 table 是一个查询,因为我不想获取整个 table .示例如下。
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
sql = select([schema.datetime, # DateTime format
schema.id]) \ # UUID
.select_from(schema) \ # schema class (see below)
.where(schema.datetime.between(start_dt, end_dt))
table = aliased(sql, name='table')
response = dd.read_sql_table(table=table, uri='uri', schema='schema', index_col='datetime')
对于 print(response)
,我得到了预期的输出:
Dask DataFrame Structure:
swing_id
npartitions=1
2020-08-03 07:16:59.590 object
2020-08-29 23:34:44.980 ...
Dask Name: from-delayed, 2 tasks
然而,当我开始像 print(response.head())
一样使用 dask.dataframe
时,我得到一个 Failed to Serialize
错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File ".\.conda\envs\project-name\lib\site-packages\distributed\batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\tcp.py", line 223, in write
frames = await to_frames(
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 54, in to_frames
return _to_frames()
File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
protocol.dumps(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
data = {
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
key: serialize(
File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
File "./project-name/src/points_bulk_scripts/extract_features.py", line 24, in <module>
print(swings.head())
File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1006, in head
return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1039, in _head
result = result.compute()
File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1986, in gather
return self.sync(
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 832, in sync
return sync(
File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 340, in sync
raise exc.with_traceback(tb)
File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 324, in f
result[0] = yield future
File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
value = future.result()
File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1852, in _gather
raise exc
concurrent.futures._base.CancelledError: ('head-1-5-from-delayed-97944bbb0c6f5129ccb48a1f7c36d62b', 0)
如果我不创建 client
最奇怪的部分,那么数据帧将被计算更正:
import dask.dataframe as dd
response = dd.read_sql_table(table=<SQLAlchemy query>, uri='uri', schema='schema', index_col='index')
print(response.head())
此外,如果我阅读了全部内容 table,而无需查询,它适用于客户端:
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
response = dd.read_sql_table(table=<SQLAlchemy table>, uri='uri', schema='schema', index_col='index')
print(response.head())
附加信息
版本:
dask 2.30.0 py_0
dask-core 2.30.0 py_0
dask-labextension 3.0.0 py_0
python 3.8.5 h5fd99cc_1
client.get_versions(check=True)
:
{
'scheduler': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
},
'workers': {
'tcp://127.0.0.1:57813': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
}
},
'client': {
'host': {
'python': '3.8.5.final.0',
'python-bits': 64,
'OS': 'Windows',
'OS-release': '7',
'machine': 'AMD64',
'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
'byteorder': 'little',
'LC_ALL': 'None',
'LANG': 'None'
},
'packages': {
'python': '3.8.5.final.0',
'dask': '2.30.0',
'distributed': '2.30.1',
'msgpack': '1.0.0',
'cloudpickle': '1.6.0',
'tornado': '6.0.4',
'toolz': '0.11.1',
'numpy': '1.19.2',
'lz4': None,
'blosc': None
}
}
}
schema
class
from sqlalchemy import Column, String, Integer, TIMESTAMP, Boolean
from sqlalchemy.ext.declarative import declarative_base
class Schema(declarative_base()):
__tablename__ = "table_name"
datetime = Column(TIMESTAMP, primary_key=True)
id = Column(String, primary_key=True)
我相信这是您查询的这一部分:
.select_from(schema)
其中 schema
引用真实的数据库连接。
您应该改为使用字符串表、模式和列来引用完全抽象的表达式。例如,这来自测试
from sqlalchemy import sql
query = sql.select([sql.column("number"), sql.column("name")]).select_from(
sql.table("test")
)
(我觉得