通过 Clojure & JDBC 将 5,000,000 行移动到另一个 Postgresql 数据库

Moving 5,000,000 rows to another Postgresql DBs by Clojure & JDBC

我正在尝试将 5,000,000 行从一个 Postgre 数据库移动到另一个数据库。两个连接都在 Hikari CP 连接池中。

我浏览了很多文档和帖子。它给我留下了下面的代码。但它并不是真的可用:

(jdbc/with-db-connection [tx {:datasource source-db}]
  (jdbc/query tx
      [(jdbc/prepare-statement (jdbc/get-connection tx)
                                answer-sql
                                {:fetch-size 100000})]
                  {:result-set-fn (fn [result-set]
                                    (jdbc/insert-multi!
                                     {:datasource target-db}
                                     :migrated_answers
                                     result-set))}))

我已经尝试了很多不同的形式。 jdbc/with-db-transaction 或我能想到的任何其他方法都没有太大帮助。

  1. 很多教程和帖子只提到了如何处理整个结果的方式。进入 RAM 的小表绝对没问题,但它看起来很快。但事实并非如此。

  2. 所以当我正确使用 :fetch-size 并且我的 RAM 没有爆炸(hocus pocus)时,传输速度非常慢,两个连接在 'active' 和 [= 之间切换43=] 数据库端的状态。我从来没有等这么久才找到任何实际传输的数据!

    当我在 Talend Open Studio(生成 Java 代码的 ETL 工具)中创建这个简单的批处理时,它会在 5 分钟内传输所有数据。那里的光标大小 "also" 设置为 100000。我认为 Clojure 的干净代码应该更快。

  3. 我得到的最快结果是使用下面的代码。我认为是因为 :as-array 参数。如果我不使用 :max-rows 参数内存爆炸,因为它没有被延迟处理,所以我不能将它用于整个传输。为什么?我不明白这里的规则。

    (jdbc/with-db-transaction [tx {:datasource source-db}]
      (jdbc/query tx
                  [(jdbc/prepare-statement (:connection tx)
                                            answer-sql
                                           {:result-type :forward-only
                                            :concurrency :read-only
                                            :fetch-size 2000
                                            :max-size 250000})]
                  {:as-arrays? true
                   :result-set-fn (fn [result-set]
                                    (let [keys (first result-set)
                                          values (rest result-set)]
                                      (jdbc/insert-multi! 
                                         {:datasource dct-db}
                                          :dim_answers
                                           keys values)))}))
    

如果我明显缺少任何帮助或信息,我将不胜感激。

我认为这里的关键观察是,虽然您的 query 是懒惰地从一个数据库流式传输结果,但您的 insert 只是一个巨人写入另一个数据库。关于内存使用,如果您在最后为单个写入操作收集所有这些结果(在内存中),我认为无论您是否懒惰地流式传输查询结果都没有太大区别。

平衡内存使用和吞吐量的一种方法是批量写入:

(db/with-db-transaction [tx {:datasource source-db}]
  (db/query tx
    [(db/prepare-statement (:connection tx)
                           answer-sql
                           {:result-type :forward-only
                            :concurrency :read-only
                            :fetch-size 2000})]
    {:as-arrays? true
     :result-set-fn (fn [result-set]
                      (let [keys (first result-set)
                            values (rest result-set)]
                        (doseq [batch (partition-all 2000 values)]
                          (db/insert-multi! {:datasource dct-db}
                                            :dim_answers
                                            keys
                                            batch))))}))

不同之处在于它使用partition-all 批量插入values(大小与:fetch-size 相同,但我相信这可以调整)。通过将 JVM 最大堆大小设置为类似 -Xmx1g 的值,将此方法的 performance/memory 用法与其他方法进行比较。我无法使用此堆大小完成非批处理版本。

我能够在大约 1 分钟内在笔记本电脑上的本地 PostgreSQL 数据库之间迁移 600 万个小行,java 使用 <400MB 内存。我也用过HikariCP.

如果您确实批量插入,如果适合您的用例,您可能需要考虑将所有插入包装在单个事务中。为简洁起见,我在这里留下了额外的交易。

If i dont use :max-size parameter memory explodes

我在最新的 clojure.java.jdbc 中找不到关于此选项的任何参考(除了规范),它没有影响我的测试。我确实看到 :max-rows 但你肯定不想要那个。

I think it is because the :as-array parameter.

我希望这对内存使用有益;行向量应该比行映射更 space 高效。

这个解决方案最适合我,而且似乎也比 Taylor 的解决方案更快。但非常感谢你帮助我。

在事务完成之前不会提交。我必须遇到任何问题,看看我是否不必拉皮条,但我现在很高兴。我试图用 with-db-connection 替换第一笔交易,但它使记录直接加载到 RAM 中。

(defn data->transfer2 [sql table]
     (jdbc/with-db-transaction [read-tx {:datasource dag-db}]
     (jdbc/with-db-transaction [tx {:datasource dct-db}]
        (jdbc/query read-tx
                  [(jdbc/prepare-statement (:connection read-tx)
                                           answer-sql
                                           {:result-type :forward-only
                                            :concurrency :read-only
                                            :fetch-size 100000})]
                  {:as-arrays? true
                   :result-set-fn (fn [result-set]
                                    (let [keys (first result-set)
                                          values (rest result-set)]
                                      (doseq [btch (partition-all 100000 values)]
                                        (jdbc/insert-multi! tx
                                                            :dim_answers
                                                             keys
                                                             btch))))})))