mongoimport JSON from Google Airflow 任务中的 Cloud Storage
mongoimport JSON from Google Cloud Storage in an Airflow task
似乎将数据从 GCS 移动到 MongoDB 并不常见,因为这方面的文档不多。我们有以下任务,我们将其作为 python_callable
传递给 Python 运算符 - 此任务将数据从 BigQuery 移动到 GCS 作为 JSON:
def transfer_gcs_to_mongodb(table_name):
# connect
client = bigquery.Client()
bucket_name = "our-gcs-bucket"
project_id = "ourproject"
dataset_id = "ourdataset"
destination_uri = f'gs://{bucket_name}/{table_name}.json'
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_name)
configuration = bigquery.job.ExtractJobConfig()
configuration.destination_format = 'NEWLINE_DELIMITED_JSON'
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=configuration,
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))
此任务正在成功将数据导入 GCS。但是,我们现在陷入了如何正确 运行 mongoimport
以将此数据输入 MongoDB 的问题。特别是,好像mongoimport
不能指向GCS中的文件,而是要先下载到本地,再导入到MongoDB.
这应该如何在 Airflow 中完成?我们是否应该编写一个 shell 脚本从 GCS 下载 JSON,然后使用正确的 uri
和所有正确的标志 运行s mongoimport
?还是有另一种方法可以在我们缺少的 Airflow 中 运行 mongoimport
?
您无需编写 shell 脚本即可从 GCS 下载。您可以简单地使用 MongoHook 的 GCSToLocalFilesystemOperator then you can open the file and write it to mongo using the insert_many 函数。
我没有测试它,但它应该是这样的:
mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
file_data = json.load(f)
mongo.insert_many(file_data)
这是一个管道:BigQuery -> GCS -> 本地文件系统 -> MongoDB。
如果愿意,您也可以在内存中执行此操作:BigQuery -> GCS -> MongoDB。
似乎将数据从 GCS 移动到 MongoDB 并不常见,因为这方面的文档不多。我们有以下任务,我们将其作为 python_callable
传递给 Python 运算符 - 此任务将数据从 BigQuery 移动到 GCS 作为 JSON:
def transfer_gcs_to_mongodb(table_name):
# connect
client = bigquery.Client()
bucket_name = "our-gcs-bucket"
project_id = "ourproject"
dataset_id = "ourdataset"
destination_uri = f'gs://{bucket_name}/{table_name}.json'
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_name)
configuration = bigquery.job.ExtractJobConfig()
configuration.destination_format = 'NEWLINE_DELIMITED_JSON'
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=configuration,
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))
此任务正在成功将数据导入 GCS。但是,我们现在陷入了如何正确 运行 mongoimport
以将此数据输入 MongoDB 的问题。特别是,好像mongoimport
不能指向GCS中的文件,而是要先下载到本地,再导入到MongoDB.
这应该如何在 Airflow 中完成?我们是否应该编写一个 shell 脚本从 GCS 下载 JSON,然后使用正确的 uri
和所有正确的标志 运行s mongoimport
?还是有另一种方法可以在我们缺少的 Airflow 中 运行 mongoimport
?
您无需编写 shell 脚本即可从 GCS 下载。您可以简单地使用 MongoHook 的 GCSToLocalFilesystemOperator then you can open the file and write it to mongo using the insert_many 函数。
我没有测试它,但它应该是这样的:
mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
file_data = json.load(f)
mongo.insert_many(file_data)
这是一个管道:BigQuery -> GCS -> 本地文件系统 -> MongoDB。
如果愿意,您也可以在内存中执行此操作:BigQuery -> GCS -> MongoDB。