如何运行 Dask Distributed 上的SQLAlchemy 查询?
How to Run SQLAlchemy Query on Dask Distributed?
我正在尝试 运行 并使用我设置的 dask 集群并行化此 sqlalchemy 查询,因为我没有足够的内存来从我的本地计算机执行它。
我的代码如下 - 我不确定这是否是完成此操作的最佳方法:
from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)
recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']
query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]
from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
connect_args={'protocol': 'https',
'requests_kwargs': {'verify': key}})
con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])
我收到以下错误:
TypeError: can't pickle _thread.RLock objects
您应该使用专为此目的而设计的功能read_sql_table
。如果您阅读文档字符串 and/or 代码,您会看到传递给工作人员的是查询本身,工作人员在本地创建自己的引擎实例。这是因为 sqlalchemy 实例的状态无法在工作人员之间发送,正如您所发现的。
请注意,read_sql_table
也关心数据的分区,因为这是 Dask,重点是处理大于内存的数据。在您的示例中,我猜 index/partitioning 列是 date
,并且您想传递要明确拆分的 "divisions"。
我正在尝试 运行 并使用我设置的 dask 集群并行化此 sqlalchemy 查询,因为我没有足够的内存来从我的本地计算机执行它。
我的代码如下 - 我不确定这是否是完成此操作的最佳方法:
from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)
recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']
query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]
from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
connect_args={'protocol': 'https',
'requests_kwargs': {'verify': key}})
con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])
我收到以下错误:
TypeError: can't pickle _thread.RLock objects
您应该使用专为此目的而设计的功能read_sql_table
。如果您阅读文档字符串 and/or 代码,您会看到传递给工作人员的是查询本身,工作人员在本地创建自己的引擎实例。这是因为 sqlalchemy 实例的状态无法在工作人员之间发送,正如您所发现的。
请注意,read_sql_table
也关心数据的分区,因为这是 Dask,重点是处理大于内存的数据。在您的示例中,我猜 index/partitioning 列是 date
,并且您想传递要明确拆分的 "divisions"。