psycopg2 AttributeError: 'Cursor' object has no attribute 'mogrify'

psycopg2 AttributeError: 'Cursor' object has no attribute 'mogrify'

我有一个 class 可以帮助我在 Postgres 中进行 SQL 查询和插入。我现在正在使用 psycopg2==2.7.5。我使用的方法之一如下所示:

import pandas as pd    
import psycopg2.extensions as ps_ext
from typing import List

def insert_with_open_connection(self, df: pd.DataFrame, table_name: str, cursor: ps_ext.cursor,
                                conn: ps_ext.connection,
                                success_msg: str = 'Success',
                                conflict_cols: List[str] = None):
    try:
        # Format the INSERT SQL query
        cols = str(tuple(df.columns)).replace("'", '')
        nc = df.shape[1]
        ss = "(" + ''.join('%s,' for _ in range(nc))[:-1] + ")"
        try:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
        except psycopg2.ProgrammingError:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in self.clean_numpy_int_for_mogrify(df.values)),
                           'utf-8')
        args_str = args_str.replace("\'NaN\'::float", 'NULL')
        insert_sql = f'INSERT INTO {table_name} {cols} VALUES {args_str}'
        if conflict_cols is not None:
            conf_cols = str(tuple(conflict_cols)).replace("'", '').replace(',)', ')')
            insert_sql += f"\nON CONFLICT {conf_cols} DO NOTHING"
        insert_sql += ';'
        cursor.execute(insert_sql)
        conn.commit()
        return success_msg, 200
    except Exception:
        return traceback.format_exc(), 400

conncursor 参数是从 SqlAlchemy Engine 生成的,代码如下:

def create_pool(self, **db_config):
    db_user = self.user
    db_pass = self.password
    db_name = self.database

    # e.g. "/cloudsql"
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
    cloud_sql_connection_name = os.environ.get("CLOUD_SQL_CONNECTION_NAME",
                                               '<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>')

    self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

def get_db_connection(self) -> Connection:
    if self.pool is None:
        self.create_pool()

    assert isinstance(self.pool, Engine)
    try:
        return self.pool.raw_connection()
    except psycopg2.OperationalError:
        self.create_pool()
        return self.pool.raw_connection()

@contextlib.contextmanager
def db_connect(self):
    db = self.get_db_connection()
    cur = db.cursor()
    try:
        yield db, cur
    finally:
        db.close()

我正在尝试在 Google 云函数 (Linux) 中使用此代码,当我 运行 [=17] 时得到以下 error/traceback =] 那里的方法:

Traceback (most recent call last):
  File "/workspace/db/sql_helper.py", line 221, in insert_with_open_connection
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
  File "/workspace/db/sql_helper.py", line 221, in <genexpr>
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
AttributeError: 'Cursor' object has no attribute 'mogrify'

很明显代码中的cursor似乎没有属性mogrify,但是根据文档heremogrify方法应该存在。

我查看了代码,发现您使用的是 import psycopg2.extensions as ps_ext;显然 mogrify 在 文档。

然后我遇到了这条线:

self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

您没有使用 psycopg2 驱动程序;但是 pg8000 和追踪 生成事物的方式,db.cursor() 返回的游标又由 self.pool.raw_connection() 创建, 我得出的结论是光标不是 ps_ext 光标,而是 一个 pg8000 游标,它没有 mogrify 方法 显示在:https://github.com/tlocke/pg8000/blob/main/pg8000/dbapi.py

这就是您出现此错误的可能性。我认为 解决方案是改为使用 psycopg2 驱动程序。

也就是说,这个答案可能是错误的,我找错人了。