dask read_sql_table 在带有数字日期时间的 sqlite table 上失败

dask read_sql_table fails on sqlite table with numeric datetime

我得到了一些需要读入 dask 数据帧的大型 sqlite 表。这些表具有日期时间(ISO 格式的字符串)的列,这些列存储为 sqlite NUMERIC 数据类型。我可以使用 Pandas' read_sql_table 读取此类数据。但是,来自 dask 的相同调用会出错。有人可以建议一个好的解决方法吗? (我不知道有什么简单的方法可以将这些列的 sqlite 数据类型从 NUMERIC 更改为 TEXT。)我在下面粘贴了一个最小的示例。

import sqlalchemy
import pandas as pd
import dask.dataframe as ddf

connString = "sqlite:///c:\temp\test.db"
engine = sqlalchemy.create_engine(connString)
conn = engine.connect()
conn.execute("create table testtable (uid integer Primary Key, datetime NUM)")
conn.execute("insert into testtable values (1, '2017-08-03 01:11:31')")
print(conn.execute('PRAGMA table_info(testtable)').fetchall())
conn.close()

pandasDF = pd.read_sql_table('testtable', connString, index_col='uid', parse_dates={'datetime':'%Y-%m-%d %H:%M:%S'})
pandasDF.head()

daskDF = ddf.read_sql_table('testtable', connString, index_col='uid',  parse_dates={'datetime':'%Y-%m-%d %H:%M:%S'})

这是回溯:

Warning (from warnings module):
  File "C:\Program Files\Python36\lib\site-packages\sqlalchemy\sql\sqltypes.py", line 596
    'storage.' % (dialect.name, dialect.driver))
SAWarning: Dialect sqlite+pysqlite does *not* support Decimal objects natively, and SQLAlchemy must convert from floating point - rounding errors and other issues may occur. Please consider storing Decimal numbers as strings or integers on this platform for lossless storage.
Traceback (most recent call last):
  File "<pyshell#2>", line 1, in <module>
    daskDF = ddf.read_sql_table('testtable', connString, index_col='uid',  parse_dates={'datetime':'%Y-%m-%d %H:%M:%S'})
  File "C:\Program Files\Python36\lib\site-packages\dask\dataframe\io\sql.py", line 98, in read_sql_table
    head = pd.read_sql(q, engine, **kwargs)
  File "C:\Program Files\Python36\lib\site-packages\pandas\io\sql.py", line 416, in read_sql
    chunksize=chunksize)
  File "C:\Program Files\Python36\lib\site-packages\pandas\io\sql.py", line 1104, in read_query
    parse_dates=parse_dates)
  File "C:\Program Files\Python36\lib\site-packages\pandas\io\sql.py", line 157, in _wrap_result
    coerce_float=coerce_float)
  File "C:\Program Files\Python36\lib\site-packages\pandas\core\frame.py", line 1142, in from_records
    coerce_float=coerce_float)
  File "C:\Program Files\Python36\lib\site-packages\pandas\core\frame.py", line 6304, in _to_arrays
    data = lmap(tuple, data)
  File "C:\Program Files\Python36\lib\site-packages\pandas\compat\__init__.py", line 129, in lmap
    return list(map(*args, **kwargs))
TypeError: must be real number, not str

编辑:@mdurant 的评论让我现在想知道这是否是 sqlalchemy 中的错误。以下代码给出与 pandas 相同的错误消息:

import sqlalchemy as sa
from sqlalchemy import text

m = sa.MetaData()
table = sa.Table('testtable', m, autoload=True, autoload_with=engine)
resultList = conn.execute(sa.sql.select(table.columns).select_from(table)).fetchall()
print(resultList)

resultList2 = conn.execute(sa.sql.select(columns=[text('uid'),text('datetime')], from_obj = text('testtable'))).fetchall() 
print(resultList2)

Traceback (most recent call last):

  File "<ipython-input-20-188c84a35d95>", line 1, in <module>
    print(resultList)

  File "c:\program files\python36\lib\site-packages\sqlalchemy\engine\result.py", line 156, in __repr__
    return repr(sql_util._repr_row(self))

  File "c:\program files\python36\lib\site-packages\sqlalchemy\sql\util.py", line 329, in __repr__
    ", ".join(trunc(value) for value in self.row),

TypeError: must be real number, not str

令人费解。 这里有一些进一步的信息,希望可以得到答案。

在相关行执行的查询是

pd.read_sql(sql.select(table.columns).select_from(table),
    engine, index_col='uid')

如您所示失败(limit 与此处无关)。

但是,同一查询的文本版本

sql.select(table.columns).select_from(table).compile().string
    -> 'SELECT testtable.uid, testtable.datetime \nFROM testtable'
pd.read_sql('SELECT testtable.uid, testtable.datetime \nFROM testtable',
    engine, index_col='uid')  # works fine

以下变通方法(在查询中使用强制转换)确实有效(但不够完美):

import sqlalchemy as sa
engine = sa.create_engine(connString)
table = sa.Table('testtable', m, autoload=True, autoload_with=engine)
uid, dt = list(table.columns)
q = sa.select([dt.cast(sa.types.String)]).select_from(table)
daskDF = ddf.read_sql_table(q, connString, index_col=uid.label('uid'))

-编辑-

这个似乎也有效的更简单形式(见评论)

daskDF = ddf.read_sql_table('testtable', connString, index_col='uid',
    columns=['uid', sa.sql.column('datetime').cast(sa.types.String).label('datet‌​ime')])