将 JSON 从谷歌云存储上传到 bigquery 时出现混乱

Confusion when uploading a JSON from googlecloud storage to bigquery

您好,这是一个由两部分组成的问题

1) 目前我正在尝试通过 python 脚本将文件从 google 云存储上传到 bigquery。我正在尝试按照 google 帮助站点给出的步骤进行操作。

https://cloud.google.com/bigquery/docs/loading-data-cloud-storage#bigquery-import-gcs-file-python

def load_data_from_gcs(dataset_name, table_name, source):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset_name)
    table = dataset.table(table_name)
    job_name = str(uuid.uuid4())

    job = bigquery_client.load_table_from_storage(
        job_name, table, source)

    job.begin()

    wait_for_job(job)

    print('Loaded {} rows into {}:{}.'.format(
        job.output_rows, dataset_name, table_name))

我不确定在 "load_data_from_gcs" 的第一行输入什么,因为在 google 云中没有 tables 它是 JSON 文件我是试图上传。 "table" 部分是我正在尝试创建的 table 的名称,还是在谈论存储桶,因为没有部分指定我想从哪个存储桶中提取。

这是我目前的代码。

import json
import argparse
import time
import uuid

from google.cloud import bigquery
# from google.cloud import storage

def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
    bigquery_client = bigquery.Client('dataworks-356fa')
    dataset = bigquery_client.dataset('FirebaseArchive')
    table = dataset.table(table_name)
    job_name = str(uuid.uuid4())

    job = bigquery_client.load_table_from_storage(
        job_name, table, source)

    job.begin()

    wait_for_job(job)

    print('Loaded {} rows into {}:{}.'.format(
        job.output_rows, dataset_name, table_name))

第 2 部分) 我希望这个脚本每周 运行 并且能够删除旧的 table 并创建一个新的或仅在非重复数据中进行过滤。哪个更容易。

感谢您的帮助。

不确定您遇到了什么问题,但是将数据从文件从 GCS 加载到 BigQuery 正是您已经在做的事情。

如果您的 table 具有此架构:

[{"name": "id", "type": "INT64"}, {"name": "name", "type": "STRING"}]

如果您在 GCS 中有此文件(例如位于 "gs://bucket/json_data.json"):

{"id": 1, "name": "test1"}
{"id": 2, "name": "test2"}

您现在只需要设置 job 对象来处理 JSON 文件作为输入,如下所示:

def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
    bigquery_client = bigquery.Client('dataworks-356fa')
    dataset = bigquery_client.dataset('FirebaseArchive')
    table = dataset.table(table_name)
    job_name = str(uuid.uuid4())

    job = bigquery_client.load_table_from_storage(
        job_name, table, "gs://bucket/json_data.json")

    job.source_format = 'NEWLINE_DELIMITED_JSON'
    job.begin()

就是这样。

(如果您有 CSV 文件,则必须相应地设置 job 对象)。

至于第二个问题,其实就是尝试不同的方法,看看哪种方法最适合你。

要删除 table,您只需 运行:

table.delete()

要从 table 中删除重复数据,一种可能是编写一个查询来删除重复并将结果保存到相同的 table。类似于:

query_job = bigquery_client.run_async_query(query=your_query, job_name=job_name)
query_job.destination = Table object
query_job.write_disposition = 'WRITE_TRUNCATE'
query_job.begin()