如何将 Python 管道映射到 Bigquery 模式
How to map a Python Dict to a Big Query Schema
我有一个包含一些嵌套值的字典:
my_dict = {
"id": 1,
"name": "test",
"system": "x",
"date": "2015-07-27",
"profile": {
"location": "My City",
"preferences": [
{
"code": "5",
"description": "MyPreference",
}
]
},
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
}
并且,我有一个大查询 Table 架构如下:
schema = {
"fields": [
{'name':'id', 'type':'INTEGER', 'mode':'REQUIRED'},
{'name':'name', 'type':'STRING', 'mode':'REQUIRED'},
{'name':'date', 'type':'TIMESTAMP', 'mode':'REQUIRED'},
{'name':'profile', 'type':'RECORD', 'fields':[
{'name':'location', 'type':'STRING', 'mode':'NULLABLE'},
{'name':'preferences', 'type':'RECORD', 'mode':'REPEATED', 'fields':[
{'name':'code', 'type':'STRING', 'mode':'NULLABLE'},
{'name':'description', 'type':'STRING', 'mode':'NULLABLE'}
]},
]},
{'name':'logins', 'type':'TIMESTAMP', 'mode':'REPEATED'}
]
}
我想遍历所有原来的my_dict,根据schema的结构构建一个新的dict。换句话说,迭代模式并从原始 my_dict 中选取正确的值。
要像这样构建一个新字典(请注意,未复制架构中不存在的字段 "system"):
new_dict = {
"id": 1,
"name": "test",
"date": "2015-07-27",
"profile": {
"location": "My City",
"preferences": [
{
"code": "5",
"description": "MyPreference",
}
]
},
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
}
没有嵌套字段迭代简单的 dict.items() 和复制值可能会更容易,但是我如何构建新的字典以递归方式访问原始字典?
我已经构建了一个递归函数来执行此操作。我不确定这是否是更好的方法,但有效:
def map_dict_to_bq_schema(source_dict, schema, dest_dict):
#iterate every field from current schema
for field in schema['fields']:
#only work in existant values
if field['name'] in source_dict:
#nested field
if field['type'].lower()=='record' and 'fields' in field:
#list
if 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
new_item = {}
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field['name']].append(new_item)
#record
else:
dest_dict[field['name']] = {}
map_dict_to_bq_schema( source_dict[field['name']], field, dest_dict[field['name']] )
#list
elif 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
dest_dict[field['name']].append(item)
#plain field
else:
dest_dict[field['name']]=source_dict[field['name']]
format_value_bq(source_dict[field['name']], field['type'])
new_dict = {}
map_dict_to_bq_schema (my_dict, schema, new_dict)
我更新了函数,因为 Schemafield
的使用发生了一些变化。
# [START] map_dict_to_bq_schema
# Function to take a dictionary and the bigquery schema
# and return a tuple to feed into bigquery
def map_dict_to_bq_schema(source_dict, schema, dest_dict=None):
if dest_dict is None:
dest_dict = dict()
# Use the existing schema to iterate over all the fields.
# Note: some fields may be nested (those are then flagged as a RECORD)
if not isinstance(schema, list):
# This is an individual field.
schema = [schema]
# List of fields...
for field in schema:
if field.name in source_dict:
# Nested object
if field.field_type == "RECORD" and len(field.fields) > 0:
# This is a nested field.
if field.mode == "REPEATED":
dest_dict[field.name] = []
for item in source_dict[field.name]:
new_item = {}
# Recursive!
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field.name].append(new_item)
else:
dest_dict[field.name] = {}
# Recursive!
map_dict_to_bq_schema( source_dict[field.name], field, dest_dict[field.name] )
# Array
elif field.mode == "REPEATED":
if field.name in source_dict:
dest_dict[field.name] = []
for item in source_dict[field.name]:
dest_dict[field.name].append(item)
else:
dest_dict[field.name] = [""]
# Regular field
else:
dest_dict[field.name] = source_dict[field.name]
# Done...
return dest_dict
# [END] map_dict_to_bq_schema
考虑使用 schema_from_json:
my_schema = bq_client.schema_from_json('path/to/schema/file.json')
如果您需要架构代码,那么您可以使用复制表示
my_schema
>>> [SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]
并编辑它:
from google.cloud import bigquery as bq
my_edited_schema = [bq.SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
bq.SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]
我有一个包含一些嵌套值的字典:
my_dict = {
"id": 1,
"name": "test",
"system": "x",
"date": "2015-07-27",
"profile": {
"location": "My City",
"preferences": [
{
"code": "5",
"description": "MyPreference",
}
]
},
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
}
并且,我有一个大查询 Table 架构如下:
schema = {
"fields": [
{'name':'id', 'type':'INTEGER', 'mode':'REQUIRED'},
{'name':'name', 'type':'STRING', 'mode':'REQUIRED'},
{'name':'date', 'type':'TIMESTAMP', 'mode':'REQUIRED'},
{'name':'profile', 'type':'RECORD', 'fields':[
{'name':'location', 'type':'STRING', 'mode':'NULLABLE'},
{'name':'preferences', 'type':'RECORD', 'mode':'REPEATED', 'fields':[
{'name':'code', 'type':'STRING', 'mode':'NULLABLE'},
{'name':'description', 'type':'STRING', 'mode':'NULLABLE'}
]},
]},
{'name':'logins', 'type':'TIMESTAMP', 'mode':'REPEATED'}
]
}
我想遍历所有原来的my_dict,根据schema的结构构建一个新的dict。换句话说,迭代模式并从原始 my_dict 中选取正确的值。
要像这样构建一个新字典(请注意,未复制架构中不存在的字段 "system"):
new_dict = {
"id": 1,
"name": "test",
"date": "2015-07-27",
"profile": {
"location": "My City",
"preferences": [
{
"code": "5",
"description": "MyPreference",
}
]
},
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
}
没有嵌套字段迭代简单的 dict.items() 和复制值可能会更容易,但是我如何构建新的字典以递归方式访问原始字典?
我已经构建了一个递归函数来执行此操作。我不确定这是否是更好的方法,但有效:
def map_dict_to_bq_schema(source_dict, schema, dest_dict):
#iterate every field from current schema
for field in schema['fields']:
#only work in existant values
if field['name'] in source_dict:
#nested field
if field['type'].lower()=='record' and 'fields' in field:
#list
if 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
new_item = {}
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field['name']].append(new_item)
#record
else:
dest_dict[field['name']] = {}
map_dict_to_bq_schema( source_dict[field['name']], field, dest_dict[field['name']] )
#list
elif 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
dest_dict[field['name']].append(item)
#plain field
else:
dest_dict[field['name']]=source_dict[field['name']]
format_value_bq(source_dict[field['name']], field['type'])
new_dict = {}
map_dict_to_bq_schema (my_dict, schema, new_dict)
我更新了函数,因为 Schemafield
的使用发生了一些变化。
# [START] map_dict_to_bq_schema
# Function to take a dictionary and the bigquery schema
# and return a tuple to feed into bigquery
def map_dict_to_bq_schema(source_dict, schema, dest_dict=None):
if dest_dict is None:
dest_dict = dict()
# Use the existing schema to iterate over all the fields.
# Note: some fields may be nested (those are then flagged as a RECORD)
if not isinstance(schema, list):
# This is an individual field.
schema = [schema]
# List of fields...
for field in schema:
if field.name in source_dict:
# Nested object
if field.field_type == "RECORD" and len(field.fields) > 0:
# This is a nested field.
if field.mode == "REPEATED":
dest_dict[field.name] = []
for item in source_dict[field.name]:
new_item = {}
# Recursive!
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field.name].append(new_item)
else:
dest_dict[field.name] = {}
# Recursive!
map_dict_to_bq_schema( source_dict[field.name], field, dest_dict[field.name] )
# Array
elif field.mode == "REPEATED":
if field.name in source_dict:
dest_dict[field.name] = []
for item in source_dict[field.name]:
dest_dict[field.name].append(item)
else:
dest_dict[field.name] = [""]
# Regular field
else:
dest_dict[field.name] = source_dict[field.name]
# Done...
return dest_dict
# [END] map_dict_to_bq_schema
考虑使用 schema_from_json:
my_schema = bq_client.schema_from_json('path/to/schema/file.json')
如果您需要架构代码,那么您可以使用复制表示
my_schema
>>> [SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]
并编辑它:
from google.cloud import bigquery as bq
my_edited_schema = [bq.SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
bq.SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]