Dask Using SQLAlchemy join as table for dask.dataframe.read_sql - index_col 不能同时让 pandas 和 dask 开心
Dask Using SQLAlchemy join as table for dask.dataframe.read_sql - index_col can't make both pandas and dask happy
我创建了一个 SQLAlchemy sqlalchemy.sql.selectable.Join 对象,这样我就可以将几个表的连接拉入一个 dask 数据框。
加入防御:
joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)
如果我像这样直接将一个子集读入 pandas,它会起作用:
pd_df_join = pd.read_sql_query(
join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')
但是,如果我尝试用 dask 做同样的事情,我会遇到两个错误之一,Pandas 或 Dask 找不到我所指的列。
Pandas:
In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
head = pd.read_sql(q, engine, **kwargs)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
chunksize=chunksize,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
parse_dates=parse_dates,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
frame.set_index(index_col, inplace=True)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"
任务:
In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-16-b639079b01cd>", line 1, in <module>
dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
index = table.columns[index_col] if isinstance(index_col, str) else index_col
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
return self._data[key]
KeyError: 'COL1'
我不确定是否有办法解决这个问题,或者我是否以错误的方式做某事。非常感谢任何帮助!
您的输入应该只是 sql 表达式,不要编译或混合表达式和字符串。
在这种情况下,它可能看起来像
pd_df_join = dd.read_sql_table(join, engine_uri, index_col=TABLE1.c.COL1)
我创建了一个 SQLAlchemy sqlalchemy.sql.selectable.Join 对象,这样我就可以将几个表的连接拉入一个 dask 数据框。
加入防御:
joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)
如果我像这样直接将一个子集读入 pandas,它会起作用:
pd_df_join = pd.read_sql_query(
join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')
但是,如果我尝试用 dask 做同样的事情,我会遇到两个错误之一,Pandas 或 Dask 找不到我所指的列。 Pandas:
In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
head = pd.read_sql(q, engine, **kwargs)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
chunksize=chunksize,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
parse_dates=parse_dates,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
frame.set_index(index_col, inplace=True)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"
任务:
In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-16-b639079b01cd>", line 1, in <module>
dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
index = table.columns[index_col] if isinstance(index_col, str) else index_col
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
return self._data[key]
KeyError: 'COL1'
我不确定是否有办法解决这个问题,或者我是否以错误的方式做某事。非常感谢任何帮助!
您的输入应该只是 sql 表达式,不要编译或混合表达式和字符串。
在这种情况下,它可能看起来像
pd_df_join = dd.read_sql_table(join, engine_uri, index_col=TABLE1.c.COL1)