在执行许多 UPSERT 请求时丢失数据 PostgreSQL 和 Django

Missing data while preforming many often UPSERT requests PostgreSQL and Django

我尝试使用 Django 2.0、Postgres 9.5 进行 UPSERT。

我的table:

\d+ candle
                                                           Table "public.candle"
      Column       |           Type           |                      Modifiers                      | Storage | Stats target | Description 
-------------------+--------------------------+-----------------------------------------------------+---------+--------------+-------------
 id                | integer                  | not null default nextval('candle_id_seq'::regclass) | plain   |              | 
 mts               | timestamp with time zone | not null                                            | plain   |              | 
 open              | numeric(200,40)          | not null                                            | main    |              | 
 close             | numeric(200,40)          | not null                                            | main    |              | 
 high              | numeric(200,40)          | not null                                            | main    |              | 
 low               | numeric(200,40)          | not null                                            | main    |              | 
 volume            | numeric(200,40)          | not null                                            | main    |              | 
 pair_timeframe_id | integer                  | not null                                            | plain   |              | 
Indexes:
    "candle_pkey" PRIMARY KEY, btree (id)
    "candle_mts_84b62390_uniq" UNIQUE CONSTRAINT, btree (mts)
    "candle_pair_timeframe_id_3f7b76ce" btree (pair_timeframe_id)
Foreign-key constraints:
    "candle_pair_timeframe_id_3f7b76ce_fk_pair_timeframe_id" FOREIGN KEY (pair_timeframe_id) REFERENCES pair_timeframe(id) DEFERRABLE INITIALLY DEFERRED

这样实现的:

    with connection.cursor() as cursor:
    g = '''
                            INSERT INTO candle (
                              pair_timeframe_id, mts, open, close, high, low, volume
                              ) VALUES %s
                              ON CONFLICT (mts)
                              DO UPDATE SET
                                    open = EXCLUDED.open,
                                    close = EXCLUDED.close,
                                    high = EXCLUDED.high,
                                    low = EXCLUDED.low,
                                    volume = EXCLUDED.volume
                              WHERE candle.pair_timeframe_id = %s;
    ''' % (data, str(pairs_timeframes_id))

值可能很多,但通常每个请求只有一行。

问题是有时我没有插入行。测试时我看到缺少行。在下面的示例中,我看到 18:40(15:40 UTC),当 18:41 出现时,18:40 消失了。

数据,从源中获取:

(5, '2018-03-15 15:39:00', 8035, 8016.9, 8048.8, 8016.9, 129.90146749)
(5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
(5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
(5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
(5, '2018-03-15 15:40:00', 8020, 8020.1, 8023.1, 8020, 11.28589153)
(5, '2018-03-15 15:40:00', 8020, 8021.9, 8023.1, 8020, 15.23452768)
(5, '2018-03-15 15:40:00', 8020, 8022.1, 8025.8, 8020, 20.58452768)
(5, '2018-03-15 15:40:00', 8020, 8022.1, 8025.8, 8020, 20.58452768)
(5, '2018-03-15 15:40:00', 8020, 8021.8, 8037.1, 8020, 31.45192768)
(5, '2018-03-15 15:40:00', 8020, 8032, 8037.2, 8020, 36.93092875)
(5, '2018-03-15 15:40:00', 8020, 8033.7, 8040, 8020, 42.09837769)
(5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 48.50334402)
(5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 50.55369635)
(5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 53.15240701)
(5, '2018-03-15 15:40:00', 8020, 8038.8, 8040, 8020, 55.27052599)
(5, '2018-03-15 15:40:00', 8020, 8038.9, 8040, 8020, 55.67972643)
(5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 59.02972643)
(5, '2018-03-15 15:41:00', 8038.9, 8039, 8040, 8038.9, 1.0021322)
(5, '2018-03-15 15:41:00', 8038.9, 8039.3, 8040, 8038.9, 6.55185595)
(5, '2018-03-15 15:41:00', 8038.9, 8039.2, 8040, 8038.9, 11.85041595)
(5, '2018-03-15 15:41:00', 8038.9, 8040, 8040, 8038.9, 18.893392)

根据以上数据,正在执行的数据库请求:

2018-03-15 11:39:55 EDT [14104-154] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:39:00', 8035, 8016.9, 8048.8, 8016.9, 129.90146749)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:39:59 EDT [14104-159] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:04 EDT [14104-163] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:08 EDT [14104-169] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:39:00', 8035, 8021.4, 8048.8, 8016.9, 138.89783357)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:09 EDT [14104-171] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8020.1, 8023.1, 8020, 11.28589153)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:13 EDT [14104-176] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8021.9, 8023.1, 8020, 15.23452768)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:18 EDT [14104-180] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8022.1, 8025.8, 8020, 20.58452768)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:22 EDT [14104-183] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8022.1, 8025.8, 8020, 20.58452768)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:27 EDT [14104-188] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8021.8, 8037.1, 8020, 31.45192768)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:31 EDT [14104-192] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8032, 8037.2, 8020, 36.93092875)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:36 EDT [14104-197] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8033.7, 8040, 8020, 42.09837769)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:40 EDT [14104-203] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 48.50334402)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:45 EDT [14104-208] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 50.55369635)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:49 EDT [14104-213] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 53.15240701)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:54 EDT [14104-216] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8038.8, 8040, 8020, 55.27052599)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:40:58 EDT [14104-219] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8038.9, 8040, 8020, 55.67972643)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:41:02 EDT [15770-1] trader@crypto_trader_db LOG:  execute <unnamed>: SET extra_float_digits = 3
2018-03-15 11:41:02 EDT [15770-2] trader@crypto_trader_db LOG:  execute <unnamed>: set search_path='public'
2018-03-15 11:41:02 EDT [14104-224] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:40:00', 8020, 8040, 8040, 8020, 59.02972643)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:41:03 EDT [14104-226] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:41:00', 8038.9, 8039, 8040, 8038.9, 1.0021322)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:41:07 EDT [14104-231] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:41:00', 8038.9, 8039.3, 8040, 8038.9, 6.55185595)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:41:11 EDT [14104-235] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:41:00', 8038.9, 8039.2, 8040, 8038.9, 11.85041595)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

2018-03-15 11:41:16 EDT [14104-241] trader@crypto_trader_db LOG:  statement: 
                                        INSERT INTO candle (
                                          pair_timeframe_id, mts, open, close, high, low, volume
                                          ) VALUES (5, '2018-03-15 15:41:00', 8038.9, 8040, 8040, 8038.9, 18.893392)
                                          ON CONFLICT (mts)
                                          DO UPDATE SET
                                                open = EXCLUDED.open,
                                                close = EXCLUDED.close,
                                                high = EXCLUDED.high,
                                                low = EXCLUDED.low,
                                                volume = EXCLUDED.volume
                                          WHERE candle.pair_timeframe_id = 5;

这就是我最终在数据库中的内容(仅显示 id 和 mts):

    id, mts

308805  2018-03-15 18:41:00.000000
308681  2018-03-15 18:39:00.000000
308623  2018-03-15 18:38:00.000000
308584  2018-03-15 18:37:00.000000
308530  2018-03-15 15:30:00.000000
308482  2018-03-15 15:29:00.000000
308436  2018-03-15 15:28:00.000000
308372  2018-03-15 15:27:00.000000
308316  2018-03-15 15:26:00.000000

请求每 0.3 秒发送一次。请帮忙...

编辑:

我试图手动插入缺失的行 (psql)。首先,我确保遗漏了哪些行:

crypto_trader_db=> select id, mts from candle where (pair_timeframe_id = 5) and mts >= '2018-03-18 07:21:00' ORDER BY mts;
   id   |          mts           
--------+------------------------
 308827 | 2018-03-18 07:21:00+00
 308854 | 2018-03-18 07:22:00+00
 308917 | 2018-03-18 07:23:00+00
 308977 | 2018-03-18 07:24:00+00
 309093 | 2018-03-18 07:26:00+00
 309141 | 2018-03-18 07:27:00+00
 309190 | 2018-03-18 07:28:00+00
 309234 | 2018-03-18 07:29:00+00
 309346 | 2018-03-18 07:31:00+00
 309401 | 2018-03-18 07:32:00+00
 309464 | 2018-03-18 07:33:00+00
 309519 | 2018-03-18 07:34:00+00
 309590 | 2018-03-18 07:35:00+00
 309643 | 2018-03-18 07:36:00+00
 309702 | 2018-03-18 07:37:00+00
 309762 | 2018-03-18 07:38:00+00
 309816 | 2018-03-18 07:39:00+00
(17 rows)


crypto_trader_db=> select id, mts from candle where (pair_timeframe_id = 5) and (mts = '2018-03-18 07:30:00');
 id | mts 
----+-----
(0 rows)


crypto_trader_db=> INSERT INTO candle (pair_timeframe_id, mts, open, close, high, low, volume) 
                    VALUES (5, '2018-03-18 07:30:00', 8035, 8038, 8048.8, 8024, 608.21251505);
ERROR:  duplicate key value violates unique constraint "candle_mts_84b62390_uniq"
DETAIL:  Key (mts)=(2018-03-18 07:30:00+00) already exists.


crypto_trader_db=> delete from candle where (pair_timeframe_id = 5) and (mts = '2018-03-18 07:30:00');
DELETE 0

07:30 在哪里?

原因在于 mts 字段的唯一索引。 candle table 有一个到 pair_timeframe table 到 pair_timeframe_id 字段的外键。这意味着 mtspair_timeframe_id 必须是唯一的。这是我的错误。

但问题仍然悬而未决:什么是缺失字段? SELECT 它们 returns 0 行,当我尝试插入它时出现唯一错误。因此,我得出结论,存在遗漏的字段。但是当我尝试删除它们时,我删除了 0 行......所以,我无法更改丢失的行,它们被阻止了。我认为这可能是 PostgreSQL 的一个小错误。

此外,当我在 mts 冲突升级后尝试更新 not-unique mts 时,我没有收到来自 UPSERT 的任何错误消息(但这可能是客户端的错误, psycopg2 或 Django ORM)。

现在我的 table 看起来像这样并且 UPSERT 行为运行良好:

\d+ candle
                                                           Table "public.candle"
      Column       |           Type           |                      Modifiers                      | Storage | Stats target | Description 
-------------------+--------------------------+-----------------------------------------------------+---------+--------------+-------------
 id                | integer                  | not null default nextval('candle_id_seq'::regclass) | plain   |              | 
 mts               | timestamp with time zone | not null                                            | plain   |              | 
 open              | numeric(200,40)          | not null                                            | main    |              | 
 close             | numeric(200,40)          | not null                                            | main    |              | 
 high              | numeric(200,40)          | not null                                            | main    |              | 
 low               | numeric(200,40)          | not null                                            | main    |              | 
 volume            | numeric(200,40)          | not null                                            | main    |              | 
 pair_timeframe_id | integer                  | not null                                            | plain   |              | 
Indexes:
    "candle_pkey" PRIMARY KEY, btree (id)
    "candle_pair_timeframe_id_mts_ffec5f87_uniq" UNIQUE CONSTRAINT, btree (pair_timeframe_id, mts)
    "candle_mts_84b62390" btree (mts)
    "candle_pair_timeframe_id_3f7b76ce" btree (pair_timeframe_id)
Foreign-key constraints:
    "candle_pair_timeframe_id_3f7b76ce_fk_pair_timeframe_id" FOREIGN KEY (pair_timeframe_id) REFERENCES pair_timeframe(id) DEFERRABLE INITIALLY DEFERRED

现在请求按以下方式形成:

f = "(%s, '%s', %s, %s, %s, %s, %s)" % (pairs_timeframes_id, d["mts"], d["open"], d["close"], d["high"], d["low"], d["volume"])
with connection.cursor() as cursor:
    g = '''
                            INSERT INTO candle (
                              pair_timeframe_id, mts, open, close, high, low, volume
                              ) VALUES %s
                              ON CONFLICT (pair_timeframe_id, mts)
                              DO UPDATE SET
                                    open = EXCLUDED.open,
                                    close = EXCLUDED.close,
                                    high = EXCLUDED.high,
                                    low = EXCLUDED.low,
                                    volume = EXCLUDED.volume;
    ''' % f
    cursor.execute(g)