psycopg2 AttributeError: 'Cursor' object has no attribute 'mogrify'
psycopg2 AttributeError: 'Cursor' object has no attribute 'mogrify'
我有一个 class 可以帮助我在 Postgres 中进行 SQL 查询和插入。我现在正在使用 psycopg2==2.7.5
。我使用的方法之一如下所示:
import pandas as pd
import psycopg2.extensions as ps_ext
from typing import List
def insert_with_open_connection(self, df: pd.DataFrame, table_name: str, cursor: ps_ext.cursor,
conn: ps_ext.connection,
success_msg: str = 'Success',
conflict_cols: List[str] = None):
try:
# Format the INSERT SQL query
cols = str(tuple(df.columns)).replace("'", '')
nc = df.shape[1]
ss = "(" + ''.join('%s,' for _ in range(nc))[:-1] + ")"
try:
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
except psycopg2.ProgrammingError:
args_str = str(b','.join(cursor.mogrify(ss, x) for x in self.clean_numpy_int_for_mogrify(df.values)),
'utf-8')
args_str = args_str.replace("\'NaN\'::float", 'NULL')
insert_sql = f'INSERT INTO {table_name} {cols} VALUES {args_str}'
if conflict_cols is not None:
conf_cols = str(tuple(conflict_cols)).replace("'", '').replace(',)', ')')
insert_sql += f"\nON CONFLICT {conf_cols} DO NOTHING"
insert_sql += ';'
cursor.execute(insert_sql)
conn.commit()
return success_msg, 200
except Exception:
return traceback.format_exc(), 400
conn
和 cursor
参数是从 SqlAlchemy Engine
生成的,代码如下:
def create_pool(self, **db_config):
db_user = self.user
db_pass = self.password
db_name = self.database
# e.g. "/cloudsql"
db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")
# i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
cloud_sql_connection_name = os.environ.get("CLOUD_SQL_CONNECTION_NAME",
'<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>')
self.pool = sqlalchemy.create_engine(
# Equivalent URL:
# postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
# ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
username=db_user, # e.g. "my-database-user"
password=db_pass, # e.g. "my-database-password"
database=db_name, # e.g. "my-database-name"
query={"unix_sock":
f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
**db_config
)
def get_db_connection(self) -> Connection:
if self.pool is None:
self.create_pool()
assert isinstance(self.pool, Engine)
try:
return self.pool.raw_connection()
except psycopg2.OperationalError:
self.create_pool()
return self.pool.raw_connection()
@contextlib.contextmanager
def db_connect(self):
db = self.get_db_connection()
cur = db.cursor()
try:
yield db, cur
finally:
db.close()
我正在尝试在 Google 云函数 (Linux) 中使用此代码,当我 运行 [=17] 时得到以下 error/traceback =] 那里的方法:
Traceback (most recent call last):
File "/workspace/db/sql_helper.py", line 221, in insert_with_open_connection
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
File "/workspace/db/sql_helper.py", line 221, in <genexpr>
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
AttributeError: 'Cursor' object has no attribute 'mogrify'
很明显代码中的cursor
似乎没有属性mogrify
,但是根据文档here,mogrify
方法应该存在。
我查看了代码,发现您使用的是 import psycopg2.extensions as ps_ext
;显然 mogrify
在
文档。
然后我遇到了这条线:
self.pool = sqlalchemy.create_engine(
# Equivalent URL:
# postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
# ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
username=db_user, # e.g. "my-database-user"
password=db_pass, # e.g. "my-database-password"
database=db_name, # e.g. "my-database-name"
query={"unix_sock":
f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
**db_config
)
您没有使用 psycopg2 驱动程序;但是 pg8000 和追踪
生成事物的方式,db.cursor()
返回的游标又由 self.pool.raw_connection() 创建,
我得出的结论是光标不是 ps_ext 光标,而是
一个 pg8000 游标,它没有 mogrify
方法
显示在:https://github.com/tlocke/pg8000/blob/main/pg8000/dbapi.py
这就是您出现此错误的可能性。我认为
解决方案是改为使用 psycopg2 驱动程序。
也就是说,这个答案可能是错误的,我找错人了。
我有一个 class 可以帮助我在 Postgres 中进行 SQL 查询和插入。我现在正在使用 psycopg2==2.7.5
。我使用的方法之一如下所示:
import pandas as pd
import psycopg2.extensions as ps_ext
from typing import List
def insert_with_open_connection(self, df: pd.DataFrame, table_name: str, cursor: ps_ext.cursor,
conn: ps_ext.connection,
success_msg: str = 'Success',
conflict_cols: List[str] = None):
try:
# Format the INSERT SQL query
cols = str(tuple(df.columns)).replace("'", '')
nc = df.shape[1]
ss = "(" + ''.join('%s,' for _ in range(nc))[:-1] + ")"
try:
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
except psycopg2.ProgrammingError:
args_str = str(b','.join(cursor.mogrify(ss, x) for x in self.clean_numpy_int_for_mogrify(df.values)),
'utf-8')
args_str = args_str.replace("\'NaN\'::float", 'NULL')
insert_sql = f'INSERT INTO {table_name} {cols} VALUES {args_str}'
if conflict_cols is not None:
conf_cols = str(tuple(conflict_cols)).replace("'", '').replace(',)', ')')
insert_sql += f"\nON CONFLICT {conf_cols} DO NOTHING"
insert_sql += ';'
cursor.execute(insert_sql)
conn.commit()
return success_msg, 200
except Exception:
return traceback.format_exc(), 400
conn
和 cursor
参数是从 SqlAlchemy Engine
生成的,代码如下:
def create_pool(self, **db_config):
db_user = self.user
db_pass = self.password
db_name = self.database
# e.g. "/cloudsql"
db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")
# i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
cloud_sql_connection_name = os.environ.get("CLOUD_SQL_CONNECTION_NAME",
'<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>')
self.pool = sqlalchemy.create_engine(
# Equivalent URL:
# postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
# ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
username=db_user, # e.g. "my-database-user"
password=db_pass, # e.g. "my-database-password"
database=db_name, # e.g. "my-database-name"
query={"unix_sock":
f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
**db_config
)
def get_db_connection(self) -> Connection:
if self.pool is None:
self.create_pool()
assert isinstance(self.pool, Engine)
try:
return self.pool.raw_connection()
except psycopg2.OperationalError:
self.create_pool()
return self.pool.raw_connection()
@contextlib.contextmanager
def db_connect(self):
db = self.get_db_connection()
cur = db.cursor()
try:
yield db, cur
finally:
db.close()
我正在尝试在 Google 云函数 (Linux) 中使用此代码,当我 运行 [=17] 时得到以下 error/traceback =] 那里的方法:
Traceback (most recent call last):
File "/workspace/db/sql_helper.py", line 221, in insert_with_open_connection
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
File "/workspace/db/sql_helper.py", line 221, in <genexpr>
args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
AttributeError: 'Cursor' object has no attribute 'mogrify'
很明显代码中的cursor
似乎没有属性mogrify
,但是根据文档here,mogrify
方法应该存在。
我查看了代码,发现您使用的是 import psycopg2.extensions as ps_ext
;显然 mogrify
在
文档。
然后我遇到了这条线:
self.pool = sqlalchemy.create_engine(
# Equivalent URL:
# postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
# ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
username=db_user, # e.g. "my-database-user"
password=db_pass, # e.g. "my-database-password"
database=db_name, # e.g. "my-database-name"
query={"unix_sock":
f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
**db_config
)
您没有使用 psycopg2 驱动程序;但是 pg8000 和追踪
生成事物的方式,db.cursor()
返回的游标又由 self.pool.raw_connection() 创建,
我得出的结论是光标不是 ps_ext 光标,而是
一个 pg8000 游标,它没有 mogrify
方法
显示在:https://github.com/tlocke/pg8000/blob/main/pg8000/dbapi.py
这就是您出现此错误的可能性。我认为 解决方案是改为使用 psycopg2 驱动程序。
也就是说,这个答案可能是错误的,我找错人了。