如何将 Python 管道映射到 Bigquery 模式

How to map a Python Dict to a Big Query Schema

我有一个包含一些嵌套值的字典:

my_dict = {
    "id": 1,
    "name": "test",
    "system": "x",
    "date": "2015-07-27",
    "profile": {
        "location": "My City",
        "preferences": [
            {
                "code": "5",
                "description": "MyPreference",
            }
        ]
    },
    "logins": [
        "2015-07-27 07:01:03",
        "2015-07-27 08:27:41"
    ]
}

并且,我有一个大查询 Table 架构如下:

schema = {
    "fields": [
        {'name':'id', 'type':'INTEGER', 'mode':'REQUIRED'},
        {'name':'name', 'type':'STRING', 'mode':'REQUIRED'},
        {'name':'date', 'type':'TIMESTAMP', 'mode':'REQUIRED'},
        {'name':'profile', 'type':'RECORD', 'fields':[
            {'name':'location', 'type':'STRING', 'mode':'NULLABLE'},
            {'name':'preferences', 'type':'RECORD', 'mode':'REPEATED', 'fields':[
                {'name':'code', 'type':'STRING', 'mode':'NULLABLE'},
                {'name':'description', 'type':'STRING', 'mode':'NULLABLE'}
            ]},
        ]},
        {'name':'logins', 'type':'TIMESTAMP', 'mode':'REPEATED'}
    ]
}

我想遍历所有原来的my_dict,根据schema的结构构建一个新的dict。换句话说,迭代模式并从原始 my_dict 中选取正确的值。

要像这样构建一个新字典(请注意,未复制架构中不存在的字段 "system"):

new_dict = {
    "id": 1,
    "name": "test",
    "date": "2015-07-27",
    "profile": {
        "location": "My City",
        "preferences": [
            {
                "code": "5",
                "description": "MyPreference",
            }
        ]
    },
    "logins": [
        "2015-07-27 07:01:03",
        "2015-07-27 08:27:41"
    ]
}

没有嵌套字段迭代简单的 dict.items() 和复制值可能会更容易,但是我如何构建新的字典以递归方式访问原始字典?

我已经构建了一个递归函数来执行此操作。我不确定这是否是更好的方法,但有效:

def map_dict_to_bq_schema(source_dict, schema, dest_dict):
    #iterate every field from current schema
    for field in schema['fields']:
        #only work in existant values
        if field['name'] in source_dict:
            #nested field
            if field['type'].lower()=='record' and 'fields' in field:
                #list
                if 'mode' in field and field['mode'].lower()=='repeated':
                    dest_dict[field['name']] = []
                    for item in source_dict[field['name']]:
                        new_item = {}
                        map_dict_to_bq_schema( item, field, new_item )
                        dest_dict[field['name']].append(new_item)
                #record
                else:
                    dest_dict[field['name']] = {} 
                    map_dict_to_bq_schema( source_dict[field['name']], field, dest_dict[field['name']] )
            #list
            elif 'mode' in field and field['mode'].lower()=='repeated':
                dest_dict[field['name']] = []
                for item in source_dict[field['name']]:
                    dest_dict[field['name']].append(item)
            #plain field
            else:
                dest_dict[field['name']]=source_dict[field['name']]

                format_value_bq(source_dict[field['name']], field['type'])

new_dict = {}
map_dict_to_bq_schema (my_dict, schema, new_dict)

我更新了函数,因为 Schemafield 的使用发生了一些变化。

# [START] map_dict_to_bq_schema
# Function to take a dictionary and the bigquery schema
# and return a tuple to feed into bigquery
def map_dict_to_bq_schema(source_dict, schema, dest_dict=None):
    if dest_dict is None:
        dest_dict = dict()
    # Use the existing schema to iterate over all the fields.
    # Note: some fields may be nested (those are then flagged as a RECORD)
    if not isinstance(schema, list):
        # This is an individual field.
        schema = [schema]
    # List of fields...
    for field in schema:
        if field.name in source_dict:
            # Nested object
            if field.field_type == "RECORD" and len(field.fields) > 0:
                # This is a nested field.
                if field.mode == "REPEATED":
                    dest_dict[field.name] = []
                    for item in source_dict[field.name]:
                        new_item = {}
                        # Recursive!
                        map_dict_to_bq_schema( item, field, new_item )
                        dest_dict[field.name].append(new_item)
                else:
                    dest_dict[field.name] = {}
                    # Recursive!
                    map_dict_to_bq_schema( source_dict[field.name], field, dest_dict[field.name] )
            # Array
            elif field.mode == "REPEATED":
                if field.name in source_dict:
                    dest_dict[field.name] = []
                    for item in source_dict[field.name]:
                        dest_dict[field.name].append(item)
                else:
                    dest_dict[field.name] = [""]
            # Regular field
            else:
                dest_dict[field.name] = source_dict[field.name]
    # Done...
    return dest_dict
# [END] map_dict_to_bq_schema

考虑使用 schema_from_json:

my_schema = bq_client.schema_from_json('path/to/schema/file.json')

如果您需要架构代码,那么您可以使用复制表示

my_schema
>>> [SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]

并编辑它:

from google.cloud import bigquery as bq
my_edited_schema = [bq.SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
bq.SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]