Dask 客户端 + read_sql_table: distributed.protocol.core - 严重 - 序列化失败

Dask client + read_sql_table: distributed.protocol.core - CRITICAL - Failed to Serialize

尝试使用 dask.distributed Client 运行 read_sql_table,其中 table 是一个查询,因为我不想获取整个 table .示例如下。

import dask.dataframe as dd
from dask.distributed import Client

client = Client()
sql = select([schema.datetime,     # DateTime format
              schema.id]) \        # UUID
        .select_from(schema) \     # schema class (see below)
        .where(schema.datetime.between(start_dt, end_dt))
table = aliased(sql, name='table')
response = dd.read_sql_table(table=table, uri='uri', schema='schema', index_col='datetime')

对于 print(response),我得到了预期的输出:

Dask DataFrame Structure:
                        swing_id
npartitions=1                   
2020-08-03 07:16:59.590   object
2020-08-29 23:34:44.980      ...
Dask Name: from-delayed, 2 tasks

然而,当我开始像 print(response.head()) 一样使用 dask.dataframe 时,我得到一个 Failed to Serialize 错误:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - WARNING - Error in batched write, retrying
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File ".\.conda\envs\project-name\lib\site-packages\distributed\batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\tcp.py", line 223, in write
    frames = await to_frames(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 54, in to_frames
    return _to_frames()
  File ".\.conda\envs\project-name\lib\site-packages\distributed\comm\utils.py", line 34, in _to_frames
    protocol.dumps(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 50, in dumps
    data = {
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\core.py", line 51, in <dictcomp>
    key: serialize(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\protocol\serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function safe_head at 0x00000000053B88B0>, (<function check_meta at 0x0000000005387EE0>, (<function apply at 0x0000000002AB3280>, <function _read_sql_chunk at 0x0000000005450AF0>, [<sqlalchemy.sql.selectable.Select at 0x99f8ca0; Select object>, 'postgresql://tirisdspgadmin@tirisdspgserver:wsiNTobyx8opPo2imfCB@tirisdspgserver.postgres.database.azure.com/mav', Empty DataFrame\nColumns: [swing_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'start_time']])), Empty DataFrame\nColumns: [swing_id]\nIndex: [], 'from_delayed'), 5)")
Traceback (most recent call last):
  File "./project-name/src/points_bulk_scripts/extract_features.py", line 24, in <module>
    print(swings.head())
  File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1006, in head
    return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
  File ".\.conda\envs\project-name\lib\site-packages\dask\dataframe\core.py", line 1039, in _head
    result = result.compute()
  File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File ".\.conda\envs\project-name\lib\site-packages\dask\base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1986, in gather
    return self.sync(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 832, in sync
    return sync(
  File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 340, in sync
    raise exc.with_traceback(tb)
  File ".\.conda\envs\project-name\lib\site-packages\distributed\utils.py", line 324, in f
    result[0] = yield future
  File ".\.conda\envs\project-name\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File ".\.conda\envs\project-name\lib\site-packages\distributed\client.py", line 1852, in _gather
    raise exc
concurrent.futures._base.CancelledError: ('head-1-5-from-delayed-97944bbb0c6f5129ccb48a1f7c36d62b', 0)

如果我不创建 client 最奇怪的部分,那么数据帧将被计算更正:

import dask.dataframe as dd
response = dd.read_sql_table(table=<SQLAlchemy query>, uri='uri', schema='schema', index_col='index')
print(response.head())

此外,如果我阅读了全部内容 table,而无需查询,它适用于客户端:

import dask.dataframe as dd
from dask.distributed import Client

client = Client()
response = dd.read_sql_table(table=<SQLAlchemy table>, uri='uri', schema='schema', index_col='index')
print(response.head())

附加信息

版本:

dask                      2.30.0                     py_0
dask-core                 2.30.0                     py_0
dask-labextension         3.0.0                      py_0
python                    3.8.5                h5fd99cc_1

client.get_versions(check=True):

{
    'scheduler': {
        'host': {
            'python': '3.8.5.final.0',
            'python-bits': 64,
            'OS': 'Windows',
            'OS-release': '7',
            'machine': 'AMD64',
            'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
            'byteorder': 'little',
            'LC_ALL': 'None',
            'LANG': 'None'
        },
        'packages': {
            'python': '3.8.5.final.0',
            'dask': '2.30.0',
            'distributed': '2.30.1',
            'msgpack': '1.0.0',
            'cloudpickle': '1.6.0',
            'tornado': '6.0.4',
            'toolz': '0.11.1',
            'numpy': '1.19.2',
            'lz4': None,
            'blosc': None
        }
    },
    'workers': {
        'tcp://127.0.0.1:57813': {
            'host': {
                'python': '3.8.5.final.0',
                'python-bits': 64,
                'OS': 'Windows',
                'OS-release': '7',
                'machine': 'AMD64',
                'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
                'byteorder': 'little',
                'LC_ALL': 'None',
                'LANG': 'None'
            },
            'packages': {
                'python': '3.8.5.final.0',
                'dask': '2.30.0',
                'distributed': '2.30.1',
                'msgpack': '1.0.0',
                'cloudpickle': '1.6.0',
                'tornado': '6.0.4',
                'toolz': '0.11.1',
                'numpy': '1.19.2',
                'lz4': None,
                'blosc': None
            }
        }
    },
    'client': {
        'host': {
            'python': '3.8.5.final.0',
            'python-bits': 64,
            'OS': 'Windows',
            'OS-release': '7',
            'machine': 'AMD64',
            'processor': 'Intel64 Family 6 Model 94 Stepping 3, GenuineIntel',
            'byteorder': 'little',
            'LC_ALL': 'None',
            'LANG': 'None'
        },
        'packages': {
            'python': '3.8.5.final.0',
            'dask': '2.30.0',
            'distributed': '2.30.1',
            'msgpack': '1.0.0',
            'cloudpickle': '1.6.0',
            'tornado': '6.0.4',
            'toolz': '0.11.1',
            'numpy': '1.19.2',
            'lz4': None,
            'blosc': None
        }
    }
}

schema class

from sqlalchemy import Column, String, Integer, TIMESTAMP, Boolean
from sqlalchemy.ext.declarative import declarative_base


class Schema(declarative_base()):

    __tablename__ = "table_name"

    datetime = Column(TIMESTAMP, primary_key=True)
    id = Column(String, primary_key=True)

我相信这是您查询的这一部分:

.select_from(schema)

其中 schema 引用真实的数据库连接。

您应该改为使用字符串表、模式和列来引用完全抽象的表达式。例如,这来自测试

from sqlalchemy import sql
query = sql.select([sql.column("number"), sql.column("name")]).select_from(
        sql.table("test")
    )

(我觉得