并行 SQL 查询
Parallel SQL queries
运行 SQL 如何使用 dask 并行查询具有不同列维度的查询?以下是我的尝试:
from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
@delayed
def loadsql(sql):
return pd.read_sql_query(sql,con)
results = [loadsql(x) for x in sql_to_run]
dask.compute(results)
df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]
但是这会导致抛出以下错误:
数据库错误:sql 执行失败:"SQL QUERY"
ORA-01013: 用户请求取消当前操作
无法回滚
然后不久又出现另一个错误:
MultipleInstanceError:正在创建多个不兼容的 TerminalInteractiveShell 子类实例。
sql_to_run 是不同 sql 查询的列表
有什么建议或指点吗??谢谢!
更新 9.7.18
我认为这更多是因为我没有足够仔细地阅读文档。实际上,在 loadsql 函数之外的 con 导致了问题。以下是现在似乎按预期工作的代码更改。
def loadsql(sql):
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
result = pd.read_sql_query(sql,con)
con.close()
return result
values = [delayed(loadsql)(x) for x in sql_to_run]
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds
我的猜测是,oracle 客户端不是线程安全的。如果 conn 对象序列化,您可以尝试 运行 个进程(通过使用多处理调度程序或分布式调度程序)——这不太可能。更有可能工作的是在 loadsql
内创建连接,因此每次调用都会重新创建连接,并且希望不同的连接不会相互干扰。
运行 SQL 如何使用 dask 并行查询具有不同列维度的查询?以下是我的尝试:
from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
@delayed
def loadsql(sql):
return pd.read_sql_query(sql,con)
results = [loadsql(x) for x in sql_to_run]
dask.compute(results)
df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]
但是这会导致抛出以下错误:
数据库错误:sql 执行失败:"SQL QUERY" ORA-01013: 用户请求取消当前操作 无法回滚
然后不久又出现另一个错误:
MultipleInstanceError:正在创建多个不兼容的 TerminalInteractiveShell 子类实例。
sql_to_run 是不同 sql 查询的列表
有什么建议或指点吗??谢谢!
更新 9.7.18
我认为这更多是因为我没有足够仔细地阅读文档。实际上,在 loadsql 函数之外的 con 导致了问题。以下是现在似乎按预期工作的代码更改。
def loadsql(sql):
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
result = pd.read_sql_query(sql,con)
con.close()
return result
values = [delayed(loadsql)(x) for x in sql_to_run]
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds
我的猜测是,oracle 客户端不是线程安全的。如果 conn 对象序列化,您可以尝试 运行 个进程(通过使用多处理调度程序或分布式调度程序)——这不太可能。更有可能工作的是在 loadsql
内创建连接,因此每次调用都会重新创建连接,并且希望不同的连接不会相互干扰。