更快的 mongoimport,与气流并行?

faster mongoimport, in parallel in airflow?

tl;dr: 数据插入我们的 mongodb atlas 集群的速度似乎有限制。并行插入数据不会加快速度。我们怎样才能加快速度?我们唯一的选择是获得更大的 mongodb atlas 集群和更多 Write IOPS 吗?写入 IOPS 是多少?

我们每天用 atlas 将 >10GB+ 的数据替换并重新插入到我们的 mongodb 集群中。我们有以下 2 bash 命令,包装在 python 函数中以帮助参数化我们在 airflow 中与 BashOperator 一起使用的命令:

将单个 JSON 上传到 mongo 集群

def mongoimport_file(mongo_table, file_name):
    # upload single file from /tmp directory into Mongo cluster
    # cleanup: remove .json in /tmp at the end
    uri = 'mongodb+srv://<user>:<pass>@our-cluster.dwxnd.gcp.mongodb.net/ourdb'
    return f"""
        echo INSERT \
        && mongoimport --uri "{uri}" --collection {mongo_table} --drop --file /tmp/{file_name}.json \
        && echo AND REMOVE LOCAL FILEs... \
        && rm /tmp/{file_name}.json
    """

将 JSON 的目录上传到 mongo 集群

def mongoimport_dir(mongo_table, dir_name):
    # upload directory of JSONs into mongo cluster
    # cleanup: remove directory at the end
    uri = 'mongodb+srv://<user>:<pass>@our-cluster.dwxnd.gcp.mongodb.net/ourdb'
    return f"""
        echo INSERT \
        && cat /tmp/{dir_name}/*.json | mongoimport --uri "{uri}" --collection {mongo_table} --drop \
        && echo AND REMOVE LOCAL FILEs... \
        && rm -rf /tmp/{dir_name}
    """

使用 BashOperator 在 airflow 中调用:

import_to_mongo = BashOperator(
    task_id=f'mongo_import_v0__{this_table}',
    bash_command=mongoimport_file(mongo_table = 'tname', file_name = 'fname')
)

这两种方法都有效,但性能不同:

目前没有与 ** mongoimport_dir** 的并行化,实际上它比只导入单个文件要慢。

  1. 在 airflow 中,是否可以并行化我们目录的 mongoimport 100 JSONs,以实现大幅加速?如果有使用 python 的 pymongo 的并行解决方案无法使用 mongoimport 完成,我们很乐意切换(尽管我们强烈希望避免加载这些 JSONs 进入内存)。
  2. 目前导入到 mongo 的瓶颈是什么?是 (a) 我们服务器/docker 容器中的 CPU,还是 (b) 我们的 mongo 集群配置(集群 RAM,或集群 vCPU,或集群最大连接数,或集群读/写 IOPS) (这些甚至是什么?))。作为参考,这是我们的 mongo 配置。我假设我们可以通过获得更大的集群来加快导入速度,但是 mongodb atlas 很快就会变得非常昂贵。 0.5 个 vCPU 听起来并不多,但这已经让我们每月花费 150 美元...

首先“目前导入mongo的瓶颈是什么?”和“是(a)CPUs in our server / docker container” - 不要相信任何人会根据您提供的屏幕截图告诉您答案。

Atlas 具有监控工具,可以告诉您瓶颈是在 CPU、RAM、磁盘还是网络或数据库端的任意组合:

在客户端(气流)- 请使用主机的系统监视器 OS 来回答问题。 docker 内的测试盘 I/O。主机 OS 和 docker 存储驱动程序的某些组合在过去表现很差。

接下来,“偶写IOPS是什么”-随机 每秒写操作 https://cloud.google.com/compute/docs/disks/performance

IOPS 计算因云提供商而异。试用 AWS 和 Azure 比较成本与速度。 AWS 上的 M10 为您提供 2 vCPU,但我再次怀疑您能否在供应商之间比较它们 1:1。好处是它是按需的,测试和删除集群的成本不到一杯咖啡。

最后,“如果有使用 python 的 pymongo 的并行解决方案”——我对此表示怀疑。 mongoimport 使用每批 100,000 个文档,因此基本上它发送它的速度与接收方消耗流的速度一样快。客户端的限制可能是:网络、磁盘、CPU。如果是网络或磁盘,并行导入不会改善任何事情。如果 mongo 导入使用单个 CPU 并且它是限制因素,则多核系统可以从并行导入中受益。默认情况下 mongoimport 使用所有可用的 CPU:https://github.com/mongodb/mongo-tools/blob/cac1bfbae193d6ba68abb764e613b08285c6f62d/common/options/options.go#L302。你很难用 pymongo.

打败它