将 JSON 从谷歌云存储上传到 bigquery 时出现混乱
Confusion when uploading a JSON from googlecloud storage to bigquery
您好,这是一个由两部分组成的问题
1) 目前我正在尝试通过 python 脚本将文件从 google 云存储上传到 bigquery。我正在尝试按照 google 帮助站点给出的步骤进行操作。
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage#bigquery-import-gcs-file-python
def load_data_from_gcs(dataset_name, table_name, source):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset_name)
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, source)
job.begin()
wait_for_job(job)
print('Loaded {} rows into {}:{}.'.format(
job.output_rows, dataset_name, table_name))
我不确定在 "load_data_from_gcs" 的第一行输入什么,因为在 google 云中没有 tables 它是 JSON 文件我是试图上传。 "table" 部分是我正在尝试创建的 table 的名称,还是在谈论存储桶,因为没有部分指定我想从哪个存储桶中提取。
这是我目前的代码。
import json
import argparse
import time
import uuid
from google.cloud import bigquery
# from google.cloud import storage
def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
bigquery_client = bigquery.Client('dataworks-356fa')
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, source)
job.begin()
wait_for_job(job)
print('Loaded {} rows into {}:{}.'.format(
job.output_rows, dataset_name, table_name))
第 2 部分)
我希望这个脚本每周 运行 并且能够删除旧的 table 并创建一个新的或仅在非重复数据中进行过滤。哪个更容易。
感谢您的帮助。
不确定您遇到了什么问题,但是将数据从文件从 GCS 加载到 BigQuery 正是您已经在做的事情。
如果您的 table 具有此架构:
[{"name": "id", "type": "INT64"}, {"name": "name", "type": "STRING"}]
如果您在 GCS 中有此文件(例如位于 "gs://bucket/json_data.json"):
{"id": 1, "name": "test1"}
{"id": 2, "name": "test2"}
您现在只需要设置 job
对象来处理 JSON 文件作为输入,如下所示:
def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
bigquery_client = bigquery.Client('dataworks-356fa')
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, "gs://bucket/json_data.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
就是这样。
(如果您有 CSV 文件,则必须相应地设置 job
对象)。
至于第二个问题,其实就是尝试不同的方法,看看哪种方法最适合你。
要删除 table,您只需 运行:
table.delete()
要从 table 中删除重复数据,一种可能是编写一个查询来删除重复并将结果保存到相同的 table。类似于:
query_job = bigquery_client.run_async_query(query=your_query, job_name=job_name)
query_job.destination = Table object
query_job.write_disposition = 'WRITE_TRUNCATE'
query_job.begin()
您好,这是一个由两部分组成的问题
1) 目前我正在尝试通过 python 脚本将文件从 google 云存储上传到 bigquery。我正在尝试按照 google 帮助站点给出的步骤进行操作。
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage#bigquery-import-gcs-file-python
def load_data_from_gcs(dataset_name, table_name, source):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset_name)
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, source)
job.begin()
wait_for_job(job)
print('Loaded {} rows into {}:{}.'.format(
job.output_rows, dataset_name, table_name))
我不确定在 "load_data_from_gcs" 的第一行输入什么,因为在 google 云中没有 tables 它是 JSON 文件我是试图上传。 "table" 部分是我正在尝试创建的 table 的名称,还是在谈论存储桶,因为没有部分指定我想从哪个存储桶中提取。
这是我目前的代码。
import json
import argparse
import time
import uuid
from google.cloud import bigquery
# from google.cloud import storage
def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
bigquery_client = bigquery.Client('dataworks-356fa')
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, source)
job.begin()
wait_for_job(job)
print('Loaded {} rows into {}:{}.'.format(
job.output_rows, dataset_name, table_name))
第 2 部分) 我希望这个脚本每周 运行 并且能够删除旧的 table 并创建一个新的或仅在非重复数据中进行过滤。哪个更容易。
感谢您的帮助。
不确定您遇到了什么问题,但是将数据从文件从 GCS 加载到 BigQuery 正是您已经在做的事情。
如果您的 table 具有此架构:
[{"name": "id", "type": "INT64"}, {"name": "name", "type": "STRING"}]
如果您在 GCS 中有此文件(例如位于 "gs://bucket/json_data.json"):
{"id": 1, "name": "test1"}
{"id": 2, "name": "test2"}
您现在只需要设置 job
对象来处理 JSON 文件作为输入,如下所示:
def load_data_from_gcs('dataworks-356fa', table_name, 'pullnupload.json'):
bigquery_client = bigquery.Client('dataworks-356fa')
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name, table, "gs://bucket/json_data.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
就是这样。
(如果您有 CSV 文件,则必须相应地设置 job
对象)。
至于第二个问题,其实就是尝试不同的方法,看看哪种方法最适合你。
要删除 table,您只需 运行:
table.delete()
要从 table 中删除重复数据,一种可能是编写一个查询来删除重复并将结果保存到相同的 table。类似于:
query_job = bigquery_client.run_async_query(query=your_query, job_name=job_name)
query_job.destination = Table object
query_job.write_disposition = 'WRITE_TRUNCATE'
query_job.begin()