SQLAlchemy cursors: AttributeError: 'tuple' object has no attribute 'items'
SQLAlchemy cursors: AttributeError: 'tuple' object has no attribute 'items'
我正在尝试编写一个迭代器来接收查询并将行批量导出为字典列表。这是我正在使用的完整示例:
class QueryStream(collections.Iterator):
def __init__(self, conn_details, query, max_rows=None, batch_size=2000):
# Initialize vars.
self.engine = dst.get_connection(conn_details)
self.query = query
self.max_rows = max_rows
self.batch_size = batch_size
self.fetched_rows = 0
# Create a database cursor from query.
self.conn = self.engine.raw_connection()
self.cursor = self.conn.cursor()
self.cursor.execute(self.query)
def next(self):
try:
if self.max_rows:
if self.max_rows <= self.fetched_rows:
# Maximum rows has been fetched, so stop iterating.
raise StopIteration
elif self.max_rows <= self.batch_size:
# Max rowset is small enough to be done in one batch.
batch_size = self.max_rows
elif self.max_rows - self.fetched_rows < self.batch_size:
# On the final batch, must fetch only remaining rows.
batch_size = self.max_rows - self.fetched_rows
else:
# Get default batch size.
batch_size = self.batch_size
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
if len(batch): # 0 rows were returned, so we're probs at the end.
self.fetched_rows += len(batch)
print('Fetch {} rows so far.'.format(repr(self.fetched_rows)), file=sys.stderr)
return batch
else:
raise StopIteration
except StopIteration:
self.cursor.close()
self.conn.close()
self.engine.dispose()
raise StopIteration
问题出在这一行:
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
抛出此错误:
AttributeError: 'tuple' object has no attribute 'items'
我的印象是结果集中的每个结果都会返回一个 RowProxy
对象,支持 dict-like operations (as is mentioned in this post), but it appears the results are just plain tuples. The SQLAlchemy docs are not 100% clear on what the expected type of the results from a cursor is, only offering this example 的使用。
问题:我的光标使用有问题吗?我需要结果是一个以列名作为键的字典列表,但我不知道如果没有 RowProxy
而不是元组是否可行。
嗯,看来问题与使用原始连接有关:
# Create a database cursor from query.
self.conn = self.engine.raw_connection()
self.cursor = self.conn.cursor()
self.cursor.execute(self.query)
...
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
被替换为
# Create a database cursor from query.
self.engine = dst.get_connection(workspace_uuid, project_id)
self.stream = self.engine.execution_options(stream_results=True).execute(self.query)
...
batch = [dict(row.items()) for row in self.stream.fetchmany(batch_size)]
一切都很好。幸运的是,我使用的驱动程序 supports stream_results
(psycopg2):
stream_results – Available on: Connection, statement. Indicate to the dialect that results should be “streamed” and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the psycopg2, mysqldb and pymysql dialects.
我正在尝试编写一个迭代器来接收查询并将行批量导出为字典列表。这是我正在使用的完整示例:
class QueryStream(collections.Iterator):
def __init__(self, conn_details, query, max_rows=None, batch_size=2000):
# Initialize vars.
self.engine = dst.get_connection(conn_details)
self.query = query
self.max_rows = max_rows
self.batch_size = batch_size
self.fetched_rows = 0
# Create a database cursor from query.
self.conn = self.engine.raw_connection()
self.cursor = self.conn.cursor()
self.cursor.execute(self.query)
def next(self):
try:
if self.max_rows:
if self.max_rows <= self.fetched_rows:
# Maximum rows has been fetched, so stop iterating.
raise StopIteration
elif self.max_rows <= self.batch_size:
# Max rowset is small enough to be done in one batch.
batch_size = self.max_rows
elif self.max_rows - self.fetched_rows < self.batch_size:
# On the final batch, must fetch only remaining rows.
batch_size = self.max_rows - self.fetched_rows
else:
# Get default batch size.
batch_size = self.batch_size
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
if len(batch): # 0 rows were returned, so we're probs at the end.
self.fetched_rows += len(batch)
print('Fetch {} rows so far.'.format(repr(self.fetched_rows)), file=sys.stderr)
return batch
else:
raise StopIteration
except StopIteration:
self.cursor.close()
self.conn.close()
self.engine.dispose()
raise StopIteration
问题出在这一行:
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
抛出此错误:
AttributeError: 'tuple' object has no attribute 'items'
我的印象是结果集中的每个结果都会返回一个 RowProxy
对象,支持 dict-like operations (as is mentioned in this post), but it appears the results are just plain tuples. The SQLAlchemy docs are not 100% clear on what the expected type of the results from a cursor is, only offering this example 的使用。
问题:我的光标使用有问题吗?我需要结果是一个以列名作为键的字典列表,但我不知道如果没有 RowProxy
而不是元组是否可行。
嗯,看来问题与使用原始连接有关:
# Create a database cursor from query.
self.conn = self.engine.raw_connection()
self.cursor = self.conn.cursor()
self.cursor.execute(self.query)
...
batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
被替换为
# Create a database cursor from query.
self.engine = dst.get_connection(workspace_uuid, project_id)
self.stream = self.engine.execution_options(stream_results=True).execute(self.query)
...
batch = [dict(row.items()) for row in self.stream.fetchmany(batch_size)]
一切都很好。幸运的是,我使用的驱动程序 supports stream_results
(psycopg2):
stream_results – Available on: Connection, statement. Indicate to the dialect that results should be “streamed” and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the psycopg2, mysqldb and pymysql dialects.