Oracle 到大查询表
Oracle to Big Query Tables
我是大查询的新手,正在学习它。我有一个要求,我必须将近 300 个表从 oracle 源加载到 Big Query 暂存表。加载数据的推荐方法是什么?我知道我可以为此使用数据流,但我是否必须为其创建 300 个数据流任务或创建单个作业来迭代它?请分享您的经验和不同的方法。非常感谢。
此致,
文卡特.
根据我的经验,我们想将我们的数据仓库迁移到 bigquery,我没有使用数据流或任何工具
我只是将 tables 导出到 csv 文件,然后使用 python 代码遍历文件并将它们上传到 bigquery
https://cloud.google.com/bigquery/docs/loading-data-local#python
或者您可以将它们上传到 gcs,然后再上传到 bigquery
如果是日常操作
我认为维护一个迭代 tables 列表的单个代码提取它们并将它们附加到 bigquery tables 比创建 300 个任务
更容易
更新:
使用 pandas-gbq 将数据从 oracle 读取到 bigquery 的示例代码:
import cx_Oracle
from sqlalchemy import create_engine
engine = create_engine('oracle://user:password@host_or_scan_address:1521/ORACLE_SERVIVE_NAME')
results = pd.read_sql('select * from table_name', engine,chunk_size= 5000)
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
for result in results:
result.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
table_schema=schema, if_exists=if_exists)
如果你想将数据作为一个块加载,你可以删除 chunk_size 参数,但是如果 table 很大
,这可能会消耗内存
results = pd.read_sql('select * from table_name')
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
results.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
table_schema=schema, if_exists=if_exists)
我的建议是提取文件中的 Oracle table 内容(例如 CSV 格式)。将文件复制到 Cloud Storage。然后将它们加载到 BigQuery 中。
- 如果您要执行数据清理,您可以 运行 对加载的原始数据进行 SQL 查询,并将结果存储到新的 table.
- 如果您必须重复此操作,trigger a Cloud Function which load the file into BigQuery, on Google Cloud Storage event. You can also create scheduled queries 用于 运行ning 数据清理。
如果你想做的转换是SQL.
,数据流就没用了(昂贵,效率低,需要更多时间)
但是,如果您需要请求外部 API(用于数据转换,例如 ML API)或者如果您希望将数据下沉到除 BigQuery(Firestore,大 Table、云 SQL、...),数据流是正确的工具
编辑
为了更深入地了解细节,我假设 table 位于同一数据集中。那么,代码就简单了
def hello_gcs_generic(data, context):
client = bigquery.Client()
dataset_id = 'my_dataset'
bucket = data['bucket']
path = data['name']
table_name = path[path.rfind('/')+1:path.rfind(('.'))]
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date" # Name of the column to use for partitioning.
),
source_format=bigquery.SourceFormat.CSV
)
uri = "gs://{}/{}".format(bucket,path)
load_job = client.load_table_from_uri(
uri, dataset_ref.table(table_name), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
此处,对存储桶中的每个文件都调用 Cloud Functions。因此,如果同时drop 300个文件,会触发300个函数,并行处理。
几点:
- Table 名称等于文件名。
- 默认情况下,行为是写入追加(将数据添加到 table。
- 我强烈建议您在字段上对 table 进行分区,这里是日期字段。
- 加载作业通常很快。如果超过 9 分钟,请使用 Cloud 运行(限制为 15 分钟)或不要等待加载结束(删除此
load_job.result()
)
- CSV 以第 1 行开头。不是强制性的,但在模式自动检测的情况下,这有助于很好地命名列。
注意:我假设所有放入 Cloud Storage 的文件都必须集成到 BigQuery 中。如果没有,您可以按照 one of my articles
中的说明添加过滤器
我是大查询的新手,正在学习它。我有一个要求,我必须将近 300 个表从 oracle 源加载到 Big Query 暂存表。加载数据的推荐方法是什么?我知道我可以为此使用数据流,但我是否必须为其创建 300 个数据流任务或创建单个作业来迭代它?请分享您的经验和不同的方法。非常感谢。
此致, 文卡特.
根据我的经验,我们想将我们的数据仓库迁移到 bigquery,我没有使用数据流或任何工具 我只是将 tables 导出到 csv 文件,然后使用 python 代码遍历文件并将它们上传到 bigquery https://cloud.google.com/bigquery/docs/loading-data-local#python
或者您可以将它们上传到 gcs,然后再上传到 bigquery 如果是日常操作 我认为维护一个迭代 tables 列表的单个代码提取它们并将它们附加到 bigquery tables 比创建 300 个任务
更容易更新:
使用 pandas-gbq 将数据从 oracle 读取到 bigquery 的示例代码:
import cx_Oracle
from sqlalchemy import create_engine
engine = create_engine('oracle://user:password@host_or_scan_address:1521/ORACLE_SERVIVE_NAME')
results = pd.read_sql('select * from table_name', engine,chunk_size= 5000)
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
for result in results:
result.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
table_schema=schema, if_exists=if_exists)
如果你想将数据作为一个块加载,你可以删除 chunk_size 参数,但是如果 table 很大
,这可能会消耗内存
results = pd.read_sql('select * from table_name')
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
results.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
table_schema=schema, if_exists=if_exists)
我的建议是提取文件中的 Oracle table 内容(例如 CSV 格式)。将文件复制到 Cloud Storage。然后将它们加载到 BigQuery 中。
- 如果您要执行数据清理,您可以 运行 对加载的原始数据进行 SQL 查询,并将结果存储到新的 table.
- 如果您必须重复此操作,trigger a Cloud Function which load the file into BigQuery, on Google Cloud Storage event. You can also create scheduled queries 用于 运行ning 数据清理。
如果你想做的转换是SQL.
,数据流就没用了(昂贵,效率低,需要更多时间)但是,如果您需要请求外部 API(用于数据转换,例如 ML API)或者如果您希望将数据下沉到除 BigQuery(Firestore,大 Table、云 SQL、...),数据流是正确的工具
编辑
为了更深入地了解细节,我假设 table 位于同一数据集中。那么,代码就简单了
def hello_gcs_generic(data, context):
client = bigquery.Client()
dataset_id = 'my_dataset'
bucket = data['bucket']
path = data['name']
table_name = path[path.rfind('/')+1:path.rfind(('.'))]
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date" # Name of the column to use for partitioning.
),
source_format=bigquery.SourceFormat.CSV
)
uri = "gs://{}/{}".format(bucket,path)
load_job = client.load_table_from_uri(
uri, dataset_ref.table(table_name), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
此处,对存储桶中的每个文件都调用 Cloud Functions。因此,如果同时drop 300个文件,会触发300个函数,并行处理。
几点:
- Table 名称等于文件名。
- 默认情况下,行为是写入追加(将数据添加到 table。
- 我强烈建议您在字段上对 table 进行分区,这里是日期字段。
- 加载作业通常很快。如果超过 9 分钟,请使用 Cloud 运行(限制为 15 分钟)或不要等待加载结束(删除此
load_job.result()
) - CSV 以第 1 行开头。不是强制性的,但在模式自动检测的情况下,这有助于很好地命名列。
注意:我假设所有放入 Cloud Storage 的文件都必须集成到 BigQuery 中。如果没有,您可以按照 one of my articles
中的说明添加过滤器