Pandas read_sql 使用 pyodbc 处理损坏的数据

Pandas read_sql with pyodbc to handle corrupt data

我正在与一个拥有 4D 数据库的客户合作。 Tableau 不会连接到它。 (那完全是另一个问题,如果您知道答案,请告诉我。)我们决定做的基本上是保留两个数据副本。我正在 Python 中构建一个工具,它将从他们的数据库中获取任意 table 并将其副本存储在 MySQL 数据库中。然后它将 运行 定期并在添加新数据时更新数据。

我更愿意使用 SqlAlchemy,但它不支持 4D。因此,我将 pyodbc 与 pandas 一起使用。我正在使用

data_chunks = pandas.read_sql("SELECT * FROM table_name", con=pyodbc_connection, chunksize=100000)

然后我转身使用

chunk_df.to_sql("table_name", con=sqlalchemy_mysql_connection, index=False, if_exists="append")

将其写入 MySQL 数据库。

不幸的是,在我读入的某些 table 中,有损坏的数据,我得到一个 ValueErrorThe year xxxxx 超出范围。

跟踪中调用的最后一个函数是 data = cursor.fetchmany(chunksize),我相信它来自 pyodbc。

如何从任意 table 读取数据并能够优雅地处理损坏的数据并继续?

您可以想象使用 pyodbc Output Converter function 拦截损坏的日期值,并 "fix" 使用类似于以下的代码:

def unpack_sql_type_timestamp(raw_bytes):
    y, m, d, h, n, s, f = struct.unpack("<h5HI", raw_bytes)
    if y > 9999:
        y = 9999
    elif y < 1:
        y = 1
    return datetime.datetime(y, m, d, h, n, s, f)

pyodbc_connection = pyodbc.connect(connection_string)

pyodbc_connection.add_output_converter(
    pyodbc.SQL_TYPE_TIMESTAMP, 
    unpack_sql_type_timestamp
)

data_chunks = pandas.read_sql_query(
    "SELECT * FROM table_name", 
    con=pyodbc_connection, 
    chunksize=100000
)