使用 SQLAlchemy 在两个数据库之间移动数据的最佳方法

Best way to move data between two databases using SQLAlchemy

(已回答)我对此的回答如下,希望对您有所帮助。

我对 SQLAlchemy 和整个 Python 都很陌生,我正在寻找一些建议。我正在考虑将数据从一个 Postgres 数据库移动到另一个 Postgres 数据库。我要移动 2000 万条以上的记录,我希望这项工作每天 运行。我想知道:

  1. 我应该使用 SQLAlchemy 核心还是 ORM? (我主要使用核心 到目前为止)
  2. 我目前使用的是 SQLAlchemy 版本“1.3.23”(我应该转到 1.4/2.x)?
  3. 如何将插入优化为 运行 更快? (我听说可能有我需要启用的标志?)

很遗憾,我无法使用 pyscopg2 复制功能,因为我没有数据库的超级用户访问权限。

我正在尝试仿效别人的堆栈溢出示例:the example i am following

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time
    
    if not batch:
        break

    for row in batch:
        ???

proxy.close()

我卡住的部分是在 for 循环中。如何将数据写入下一个数据库? 我应该使用什么function/s?

这是最好的方法还是我错得离谱?

我当前使用 1.4 版的代码迭代:

conn_p = create_engine(--db connection string--, echo=True)

conn_sl = create_engine(--db connection string--, echo=False)

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)


while 'batch not empty':      
    batch = proxy.fetchmany(10000)  
    
    list1 = []

    if not batch:
        break
    
    for row in batch:
        d = dict(row.items())
        list1.append(d)    
    
    insert_stmt = table_2.insert().values(list1) 
    conn_sl.execute(insert_stmt)    


proxy.close()

还是很慢,移动10k条记录需要15秒左右。 有什么建议吗?

  1. Postgres 不会让您在同一个会话中同时访问两个数据库。有技巧,但如果您没有管理员权限,您可能无法使用它们。这意味着需要与同一主机建立第二个连接。将数据假脱机发送到客户端并返回到服务器比执行服务器端副本要昂​​贵得多,值得 DBA 为定期操作设置某种计划副本。
  2. 像这样向批量操作添加 ORM 元素只会减慢您的速度。对于数百万行,它可能会变得虚弱。
  3. 不确定,sqlalchemy 1.4+ 有很好的查询语法,但可能对这个特定问题没有帮助
  4. 如果直接使用 SQL,提示很简单。将索引推迟到加载数据之后。您可能必须删除索引并在添加行后重新创建它。注意超大交易。参见 this discussion on bulk loading for more depth

因此,实例化第二个 connection/session,删除索引,根据 statement level docs 使用每行的属性执行基本的 INSERT 语句,然后重新创建索引。

经过大约一周的工作、测试和优化,我得到了以下结果。

我正在从事一些 ETL 工作,但主要是将数据从一个 postgresql 数据库提升和转移到另一个数据库。我需要它在我将表格传递给它的方式上是动态的,这样我就可以更轻松地移动表格。请告诉我你的想法。

schema_s = "source schema"
schema_d = "destination schema"


source_creds = ('db connection string')
destination_creds = ('db connection string')

#same as destination - I use psycopg2 because it shaved off a few seconds off the insert time
conn_ssyc = psycopg2.connect('db connection string')
cur = conn_ssyc.cursor()


#list of tables to move
tbl_to_load = [
    'table_name_1',
    'table_name_2',
    'table_name_3',
    'table_name_4'
    ]

for current_table in tbl_to_load:
    start_time_per = time.time()
    
    #create the needed engines
    conn_s = create_engine(source_creds, echo=False, echo_pool=False)
    conn_d = create_engine(destination_creds, echo=False, echo_pool=False)
    
    #get the columns in each current_table 
    df = pd.read_sql(
                "SELECT \
                   column_name\
                FROM information_schema.columns\
                    WHERE table_schema = '{}'\
                    AND table_name   = '{}'".format(schema_d, current_table), conn_d)
    cur = conn_ssyc.cursor()
    print(current_table)
    
    
    #creates a list of "%s" corresponding to the number of columns in each table. 
    #Eg table with 5 columns ("%s","%s","%s","%s","%s")

    inst_col = "("
    for i in df['column_name']:
        inst_col = inst_col + "%s,"
    inst_col = inst_col[:-1]    #removes the last comma
    inst_col = inst_col + ")"

    #defining the tables
    meta_p = MetaData(schema = schema_s)
    table_1 = Table(current_table,
                    meta_p,
                    autoload_with=conn_s
                   )


    meta_sl = MetaData(schema = schema_d)
    table_2 = Table(current_table,
                    meta_sl,
                    autoload_with=conn_d
                    )
    
    #truncate current table
    truncate_query = sqlalchemy.text("TRUNCATE TABLE {}.{}".format(schema_d, current_table))
    conn_d.execution_options(autocommit=True).execute(truncate_query)

    start_time= time.time()
    
    #steam the data
    q = select([table_1])
    proxy = conn_s.execution_options(stream_results=True).execute(q)
    
    end_time = time.time()
    total_time = end_time - start_time
    print("Time: ", total_time)
    
    batch_number = 0
    while 'batch not empty':  
        batch = proxy.fetchmany(10000)  # fetching batches of 10,000 rows at a time
        
        start_time= time.time()
        #batch_number is just to monitor progress
        batch_number = batch_number + 1
        print(batch_number)
        
        list1 = []

        if not batch:
            break

        for row in batch:
            d = dict(row.items())
            list1.append(d)    

        df = pd.DataFrame(list1)    

        tpls = [tuple(x) for x in df.to_numpy()]
        
        #Here is my insert, so far based on the table size (52 columns wide, 4.3m records, and full data size in csv is 3.5gb) I can move 10k records in 3-5 seconds per batch
        
        args_str = ','.join(cur.mogrify(inst_col, x).decode('utf-8') for x in tpls)
        cur.execute("INSERT INTO {}.{} VALUES ".format(schema_d, current_table) + args_str)
        conn_ssyc.commit()
        #4-5 sec or so for 10k
        
        end_time = time.time()
        total_time = end_time - start_time
        print("Time: ", total_time)

    cur.close()
    proxy.close()
    
    end_time = time.time()
    total_time = end_time - start_time_per
    print("Time: ", total_time)