Pymongo vs pyscopg3:写入的数量级差异?

Pymongo vs pyscopg3: orders of magnitude difference for writes?

我有一个从去中心化交易所收集加密货币价格的应用程序(非商业用途,目的主要是学习一些具有真实数据的数据库技术)。我得到它与 MongoDB 和 PostgresSQL 一起工作。但是,我发现写入方面存在巨大差异,支持 MongoDB。我说的是几个数量级。我知道 MongoDB 是一个 NoSQL 数据库,并且被宣传为可以更有效地处理此类问题,但我想知道我的 postgres 实现是否遗漏了什么。我将在下面描述我是如何实现逻辑的,我将尝试提供可比较的指标(尽我所能)。

为了与数据库对话,我将 psycopg3 用于 Postgres 实现,将 pymongo 用于 MongoDB 实现。

这是我写入数据库的数据结构:

class PricingInfo(NamedTuple):
    blockchain_name: str
    dex_name: str
    pair_address: str
    token0_symbol: str
    token1_symbol: str
    token0_address: str
    token1_address: str
    raw_reserve0: int
    raw_reserve1: int
    reserve0: float
    reserve1: float
    mid_price: float
    datetime: pendulum.DateTime
    inverted: bool = False

    @classmethod
    def from_dict(cls, doc: dict) -> PricingInfo:
        doc.pop("_id", None)
        return cls(**doc)

    def to_dict(self) -> dict:
        return self._asdict()

两种实现方式相同。我每秒有数百个要写入数据库。这是我为 postgres 做的:

def register_prices(self, prices: list[PricingInfo]) -> None:
    query = """
    insert into prices (
        blockchain_name,
        dex_name,
        pair_address,
        token0_address,
        token1_address,
        raw_reserve0,
        raw_reserve1,
        reserve0,
        reserve1,
        mid_price,
        datetime
    )
    values (
        %(blockchain_name)s,
        %(dex_name)s,
        %(pair_address)s,
        %(token0_address)s,
        %(token1_address)s,
        %(raw_reserve0)s,
        %(raw_reserve1)s,
        %(reserve0)s,
        %(reserve1)s,
        %(mid_price)s,
        %(datetime)s
    )
    """

    keys_to_keep = {
        "blockchain_name",
        "dex_name",
        "pair_address",
        "token0_address",
        "token1_address",
        "raw_reserve0",
        "raw_reserve1",
        "reserve0",
        "reserve1",
        "mid_price",
        "datetime",
    }

    with psycopg.connect(self.db_uri) as conn:
        with conn.cursor() as cur:
            start = time.perf_counter()
            if len(prices) == 1:
                cur.execute(
                    query,
                    {
                        k: v
                        for k, v in prices[0].to_dict().items()
                        if k in keys_to_keep
                    },
                )
            elif len(prices) > 1:
                cur.executemany(
                    query,
                    [
                        {k: v for k, v in p.to_dict().items() if k in keys_to_keep}
                        for p in prices
                    ],
                )

        conn.commit()
    delta = time.perf_counter() - start

    if self.dex_name in {"pangolin", "trader_joe"}:
        logger.warning(f"Inserting {len(prices)}")
        logger.warning(f"Inserting prices took {delta} seconds")

这是我的 table 定义:

create table prices (
    id serial primary key,
    blockchain_name varchar(100) not null,
    dex_name varchar(100) not null,
    raw_reserve0 decimal not null,
    raw_reserve1 decimal not null,
    reserve0 decimal not null,
    reserve1 decimal not null,
    mid_price decimal not null,
    datetime timestamp with time zone not null,
    pair_address varchar(50) not null,
    token0_address varchar(50) not null,
    token1_address varchar(50) not null,
    foreign key (blockchain_name, dex_name, pair_address) references pairs (blockchain_name, dex_name, pair_address),
    foreign key (blockchain_name, dex_name, token0_address) references tokens (blockchain_name, dex_name, address),
    foreign key (blockchain_name, dex_name, token1_address) references tokens (blockchain_name, dex_name, address)
);

对于Mongo数据库:

def register_prices(self, prices: list[PricingInfo]) -> None:

    start = time.perf_counter()

    prices_table = self._db["prices"]
    prices_table.insert_many(price.to_dict() for price in prices)

    delta = time.perf_counter() - start

    if self.dex_name in {"pangolin", "trader_joe"}:
        logger.warning(f"Inserting {len(prices)}")
        logger.warning(f"Inserting prices took {delta} seconds")

该应用程序 运行 对两个数据库的处理方式完全相同。使用 postgres 编写时有一个微小的差异,其中数据字典需要稍微修改以适应模式(我在这里做一些规范化),但因为我每次只有约 600 个数据字典要修改写,我觉得这不是瓶颈。在这两种情况下,我有 8 个进程同时写入数据库。

对于 postgres,我得到了这些指标:

Inserting 587
Inserting prices took 1.175270811014343 seconds
Inserting 611
Inserting prices took 0.3126116280036513 seconds

对于mongo:

Inserting 588
Inserting prices took 0.03671051503624767 seconds
Inserting 612
Inserting prices took 0.032324473024345934 seconds

这些时间相对 stable,postgres 大约 1s 和 300ms,Mongo 大约 30ms。非常奇怪的是,对于大致相同的数据量,postgres 有两种不同的写入时间。尽管如此,即使在 postgres 的最佳情况下,它仍然比 mongo.

慢 10 倍

补充说明:

我是不是在使用 Postgres 做错了什么?

将数据批量导入 Postgres 通常最快的方法是使用 Postgres 命令 COPY. In psycopg3 this is available via the procedures shown here psycopg3 COPY。需要注意的是 COPY 是全部或 none,要么导入了所有数据,要么出错意味着导入了 none 个数据。