Oracle 到大查询表

Oracle to Big Query Tables

我是大查询的新手,正在学习它。我有一个要求,我必须将近 300 个表从 oracle 源加载到 Big Query 暂存表。加载数据的推荐方法是什么?我知道我可以为此使用数据流,但我是否必须为其创建 300 个数据流任务或创建单个作业来迭代它?请分享您的经验和不同的方法。非常感谢。

此致, 文卡特.

根据我的经验,我们想将我们的数据仓库迁移到 bigquery,我没有使用数据流或任何工具 我只是将 tables 导出到 csv 文件,然后使用 python 代码遍历文件并将它们上传到 bigquery https://cloud.google.com/bigquery/docs/loading-data-local#python

或者您可以将它们上传到 gcs,然后再上传到 bigquery 如果是日常操作 我认为维护一个迭代 tables 列表的单个代码提取它们并将它们附加到 bigquery tables 比创建 300 个任务

更容易

更新:

使用 pandas-gbq 将数据从 oracle 读取到 bigquery 的示例代码:

import cx_Oracle
from sqlalchemy import create_engine

engine = create_engine('oracle://user:password@host_or_scan_address:1521/ORACLE_SERVIVE_NAME')

results = pd.read_sql('select * from table_name', engine,chunk_size= 5000)
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
for result in results:
     result.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

如果你想将数据作为一个块加载,你可以删除 chunk_size 参数,但是如果 table 很大

,这可能会消耗内存


results = pd.read_sql('select * from table_name')
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
results.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

我的建议是提取文件中的 Oracle table 内容(例如 CSV 格式)。将文件复制到 Cloud Storage。然后将它们加载到 BigQuery 中。

如果你想做的转换是SQL.

,数据流就没用了(昂贵,效率低,需要更多时间)

但是,如果您需要请求外部 API(用于数据转换,例如 ML API)或者如果您希望将数据下沉到除 BigQuery(Firestore,大 Table、云 SQL、...),数据流是正确的工具

编辑

为了更深入地了解细节,我假设 table 位于同一数据集中。那么,代码就简单了

def hello_gcs_generic(data, context):

    client = bigquery.Client()
    dataset_id = 'my_dataset'

    bucket = data['bucket']
    path = data['name']

    table_name = path[path.rfind('/')+1:path.rfind(('.'))]

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="date"  # Name of the column to use for partitioning.
        ),
        source_format=bigquery.SourceFormat.CSV
    )

    uri = "gs://{}/{}".format(bucket,path)

    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_name), job_config=job_config
    )  # API request
    print("Starting job {}".format(load_job.job_id))
    
    load_job.result()  # Waits for table load to complete.
    print("Job finished.")

此处,对存储桶中的每个文件都调用 Cloud Functions。因此,如果同时drop 300个文件,会触发300个函数,并行处理。

几点:

  • Table 名称等于文件名。
  • 默认情况下,行为是写入追加(将数据添加到 table。
  • 我强烈建议您在字段上对 table 进行分区,这里是日期字段。
  • 加载作业通常很快。如果超过 9 分钟,请使用 Cloud 运行(限制为 15 分钟)或不要等待加载结束(删除此 load_job.result()
  • CSV 以第 1 行开头。不是强制性的,但在模式自动检测的情况下,这有助于很好地命名列。

注意:我假设所有放入 Cloud Storage 的文件都必须集成到 BigQuery 中。如果没有,您可以按照 one of my articles

中的说明添加过滤器