python asynchronous data pull error: __aexit__/__enter__
python asynchronous data pull error: __aexit__/__enter__
我正在尝试编写利用 Python 的异步功能的代码。我有一个数据库连接 class,其中有用于(断开)与数据库连接以及获取数据的代码。现在我想使用基于一个标识符的获取数据方法异步获取数据。代码如下:
import pyexasol
import pandas as pd
import logging
from typing import Iterable
import asyncio
import tqdm
class Exa(object):
def __init__(self, dsn: str = '1.2.3.4',
user: str = os.environ['UID'],
password: str = os.environ['PWD']):
self.__dsn = dsn
self.__user = user
self.__password = password
self.conn = None
def __connect(self):
if self.conn is None:
try:
self.conn = pyexasol.connect(dsn=self.__dsn, user=self.__user,
password=self.__password, encryption=True)
except Exception as e:
logging.error(f"Error in connecting with Exasol. Error is: {e}")
def __disconnect(self):
if self.conn is not None:
try:
self.conn.close()
except Exception as e:
logging.error(f"Exception in disconnecting DB. Error is {e}")
self.conn = None
def fetch(self, query: str, leave_connection_open: bool = False) -> pd.DataFrame:
# connect and execute the query
self.__connect()
try:
res = self.conn.export_to_pandas(query)
res.columns = res.columns.str.lower()
except Exception as e:
self.__disconnect()
return pd.DataFrame()
if not leave_connection_open:
self.__disconnect()
return res
def fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
res = asyncio.run(self._fetch_batch(pattern=pattern, replacement=replacement,
query=query, batchsize=batchsize))
return res
async def _fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
replacement = list(replacement)
# breaking into batches
if any(isinstance(i, str) for i in replacement):
batches = ["'" + "','".join(replacement[i:i + batchsize]) + "'"
for i in range(0, len(replacement), batchsize)]
else:
batches = [",".join(replacement[i:i + batchsize])
for i in range(0, len(replacement), batchsize)]
# connecting and executing query in batches
nbatches = len(batches)
self.__connect()
try:
tasks = [self.__run_batch_query(query=query.replace(pattern, batches[i]),
i=i, nbatches=nbatches) for i in range(nbatches)]
# progress bar
res = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))]
except Exception as e:
logging.error("Could not fetch batches of data. Error is: %s", e)
'''finally:
self.__disconnect()'''
# dataframe concatenation
res = pd.concat(res)
res.columns = res.columns.str.lower()
return res
async def __run_batch_query(self, query: str,
i: int, nbatches: int) -> pd.DataFrame:
logging.info("Fetching %d/%d", i + 1, nbatches)
async with self.fetch(query=query, leave_connection_open=True) as resp:
raw = await resp
return raw
我是运行这个代码:
from foo import Exa
db = Exa()
ids = db.fetch('select id from application limit 100')
ids1 = db.fetch_batch(pattern='IDS',
replacement=ids['id'],
query='select id from application where id in (IDS)',
batchsize=25)
但随后我得到如下错误:
ERROR:root:Could not fetch batches of data. Error is: __aexit__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
async with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __aexit__
此外,如果我将 __run_batch_query()
方法调用更改为没有 async
的 self.fetch()
方法,则错误会更改为:
ERROR:root:Could not fetch batches of data. Error is: __enter__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __enter__
如有错误请指正?
pyexasol 创建者在这里。
请注意,asyncio 不会为 Exasol-related 场景提供任何好处。 Ayncio 运行 在单个 CPU 上使用单个网络连接,这会阻止任何有意义的扩展。
从 Exasol 服务器加载数据的最有效方法是:
export_to_pandas()
或 export_to_callback()
用于单个 Python 进程;
export_parallel()
+ http_transport()
用于多个 Python 进程;
请查看 HTTP Transport (parallel)
手册页以获取解释和示例。这种方法线性扩展,您甚至可以 运行 在多个服务器上执行计算任务。
对于简单的场景,如果您通过慢速(例如 WiFi)网络传输大量数据,您可以考虑 compression=True
连接选项。
我正在尝试编写利用 Python 的异步功能的代码。我有一个数据库连接 class,其中有用于(断开)与数据库连接以及获取数据的代码。现在我想使用基于一个标识符的获取数据方法异步获取数据。代码如下:
import pyexasol
import pandas as pd
import logging
from typing import Iterable
import asyncio
import tqdm
class Exa(object):
def __init__(self, dsn: str = '1.2.3.4',
user: str = os.environ['UID'],
password: str = os.environ['PWD']):
self.__dsn = dsn
self.__user = user
self.__password = password
self.conn = None
def __connect(self):
if self.conn is None:
try:
self.conn = pyexasol.connect(dsn=self.__dsn, user=self.__user,
password=self.__password, encryption=True)
except Exception as e:
logging.error(f"Error in connecting with Exasol. Error is: {e}")
def __disconnect(self):
if self.conn is not None:
try:
self.conn.close()
except Exception as e:
logging.error(f"Exception in disconnecting DB. Error is {e}")
self.conn = None
def fetch(self, query: str, leave_connection_open: bool = False) -> pd.DataFrame:
# connect and execute the query
self.__connect()
try:
res = self.conn.export_to_pandas(query)
res.columns = res.columns.str.lower()
except Exception as e:
self.__disconnect()
return pd.DataFrame()
if not leave_connection_open:
self.__disconnect()
return res
def fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
res = asyncio.run(self._fetch_batch(pattern=pattern, replacement=replacement,
query=query, batchsize=batchsize))
return res
async def _fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
replacement = list(replacement)
# breaking into batches
if any(isinstance(i, str) for i in replacement):
batches = ["'" + "','".join(replacement[i:i + batchsize]) + "'"
for i in range(0, len(replacement), batchsize)]
else:
batches = [",".join(replacement[i:i + batchsize])
for i in range(0, len(replacement), batchsize)]
# connecting and executing query in batches
nbatches = len(batches)
self.__connect()
try:
tasks = [self.__run_batch_query(query=query.replace(pattern, batches[i]),
i=i, nbatches=nbatches) for i in range(nbatches)]
# progress bar
res = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))]
except Exception as e:
logging.error("Could not fetch batches of data. Error is: %s", e)
'''finally:
self.__disconnect()'''
# dataframe concatenation
res = pd.concat(res)
res.columns = res.columns.str.lower()
return res
async def __run_batch_query(self, query: str,
i: int, nbatches: int) -> pd.DataFrame:
logging.info("Fetching %d/%d", i + 1, nbatches)
async with self.fetch(query=query, leave_connection_open=True) as resp:
raw = await resp
return raw
我是运行这个代码:
from foo import Exa
db = Exa()
ids = db.fetch('select id from application limit 100')
ids1 = db.fetch_batch(pattern='IDS',
replacement=ids['id'],
query='select id from application where id in (IDS)',
batchsize=25)
但随后我得到如下错误:
ERROR:root:Could not fetch batches of data. Error is: __aexit__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
async with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __aexit__
此外,如果我将 __run_batch_query()
方法调用更改为没有 async
的 self.fetch()
方法,则错误会更改为:
ERROR:root:Could not fetch batches of data. Error is: __enter__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __enter__
如有错误请指正?
pyexasol 创建者在这里。
请注意,asyncio 不会为 Exasol-related 场景提供任何好处。 Ayncio 运行 在单个 CPU 上使用单个网络连接,这会阻止任何有意义的扩展。
从 Exasol 服务器加载数据的最有效方法是:
export_to_pandas()
或export_to_callback()
用于单个 Python 进程;export_parallel()
+http_transport()
用于多个 Python 进程;
请查看 HTTP Transport (parallel)
手册页以获取解释和示例。这种方法线性扩展,您甚至可以 运行 在多个服务器上执行计算任务。
对于简单的场景,如果您通过慢速(例如 WiFi)网络传输大量数据,您可以考虑 compression=True
连接选项。