mongoimport JSON from Google Airflow 任务中的 Cloud Storage

mongoimport JSON from Google Cloud Storage in an Airflow task

似乎将数据从 GCS 移动到 MongoDB 并不常见,因为这方面的文档不多。我们有以下任务,我们将其作为 python_callable 传递给 Python 运算符 - 此任务将数据从 BigQuery 移动到 GCS 作为 JSON:

def transfer_gcs_to_mongodb(table_name):
    # connect
    client = bigquery.Client()
    bucket_name = "our-gcs-bucket"
    project_id = "ourproject"
    dataset_id = "ourdataset"
        
    destination_uri = f'gs://{bucket_name}/{table_name}.json'
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_name)

    configuration = bigquery.job.ExtractJobConfig()
    configuration.destination_format = 'NEWLINE_DELIMITED_JSON'

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        job_config=configuration,
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))

此任务正在成功将数据导入 GCS。但是,我们现在陷入了如何正确 运行 mongoimport 以将此数据输入 MongoDB 的问题。特别是,好像mongoimport不能指向GCS中的文件,而是要先下载到本地,再导入到MongoDB.

这应该如何在 Airflow 中完成?我们是否应该编写一个 shell 脚本从 GCS 下载 JSON,然后使用正确的 uri 和所有正确的标志 运行s mongoimport?还是有另一种方法可以在我们缺少的 Airflow 中 运行 mongoimport

您无需编写 shell 脚本即可从 GCS 下载。您可以简单地使用 MongoHook 的 GCSToLocalFilesystemOperator then you can open the file and write it to mongo using the insert_many 函数。

我没有测试它,但它应该是这样的:

mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
    file_data = json.load(f)
mongo.insert_many(file_data)

这是一个管道:BigQuery -> GCS -> 本地文件系统 -> MongoDB。

如果愿意,您也可以在内存中执行此操作:BigQuery -> GCS -> MongoDB。