如何使用 Python 将嵌套的 Hive 查询模式转换为 Bigquery 模式?

How to convert nested Hive query schema into Bigquery schema using Python?

问题前的快速说明: 在从本地 hadoop 实例迁移到 BigQuery 的过程中,我们需要将大量 Hive 架构传输到 BigQuery 架构。我针对未嵌套模式转换提出了类似的问题,@Anjela 亲切地回答了这个非常有用的问题。但是还有另一个将嵌套结构类型模式传输到 BigQuery 模式的用例,您可以在下面找到详细信息

示例 Hive 架构:

样本=

"reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"

所需的 BigQuery 架构:

bigquery.SchemaField("reports", "RECORD", mode="REPEATED",
        fields=(
             bigquery.SchemaField('orderline', 'STRING'),
             bigquery.SchemaField('ordernumber', 'STRING'),
             bigquery.SchemaField('price', 'RECORD'),
                 fields=(
                     bigquery.SchemaField('currencyCode', 'STRING'),
                     bigquery.SchemaField('value', 'FLOAT')  
               )
             bigquery.SchemaField('quantity', 'INTEGER'),
             bigquery.SchemaField('serialnumbers', 'STRING', mode=REPEATED),
             bigquery.SchemaField('sku', 'STRING'),
               )
     )

我们从上一个问题中得到的对于将未嵌套模式转换为 bigquery 模式很有用:

import re

from google.cloud import bigquery



def is_even(number):
    if (number % 2) == 0:
        return True
    else:
        return False

def clean_string(str_value):
    return re.sub(r'[\W_]+', '', str_value)

def convert_to_bqdict(api_string):
    """
    This only works for a struct with multiple fields
    This could give you an idea on constructing a schema dict for BigQuery
    """
    num_even = True
    main_dict = {}
    struct_dict = {}
    field_arr = []
    schema_arr = []

# Hard coded this since not sure what the string will look like if there are more inputs
init_struct = sample.split(' ')
main_dict["name"] = init_struct[0]
main_dict["type"] = "RECORD"
main_dict["mode"] = "NULLABLE"

cont_struct = init_struct[1].split('<')
num_elem = len(cont_struct)

# parse fields inside of struct<
for i in range(0,num_elem):
    num_even = is_even(i)
    # fields are seen on even indices
    if num_even and i != 0:
        temp = list(filter(None,cont_struct[i].split(','))) # remove blank elements
        for elem in temp:
            fields = list(filter(None,elem.split(':')))

            struct_dict["name"] = clean_string(fields[0]) 
            # "type" works for STRING as of the moment refer to 
            # https://cloud.google.com/bigquery/docs/schemas#standard_sql_data_types
            # for the accepted data types
            struct_dict["type"] = clean_string(fields[1]).upper() 
            struct_dict["mode"] = "NULLABLE" 
            
            field_arr.append(struct_dict)
            struct_dict = {}

main_dict["fields"] = field_arr # assign dict to array of fields
schema_arr.append(main_dict)

return schema_arr

sample = "reports array<struct<imageUrl:string,reportedBy:string,newfield:bool>>"

bq_dict = convert_to_bqdict(sample)

client = bigquery.Client()
project = client.project
dataset_ref = bigquery.DatasetReference(project, '20211228')
table_ref = dataset_ref.table("20220203")
table = bigquery.Table(table_ref, schema=bq_dict)
table = client.create_table(table)  

@Anjela B. 的上述脚本正在将非嵌套查询从 hive 模式传输到 bigquery 模式,如下所示:

"name":"reports"
"col_type":"array<struct<imageUrl:string,reportedBy:string>>"

任何 help/tips 将不胜感激。

解决方法如下:

    def getSchema2(self):
    try:
        getJsonUrl = requests.get(self.urlAddress)
    except requests.exceptions.Timeout:
        time.sleep(300)
        try:
            getJsonUrl = requests.get(self.urlAddress)
        except requests.exceptions.Timeout as e:
            logger.error("Communication error: {}".format(e))
            raise SystemExit(e)
    except requests.exceptions.TooManyRedirects:
        logger.info("URL is not correct check your variables")
    except requests.exceptions.RequestException as e:
        logger.info("Communication error: {}".format(e))
        raise SystemExit(e)
    print(getJsonUrl.json()["tableData"]["columns"])

    main_dict = {}
    struct_dict = {}
    subfield_struct_dict = {}
    field_arr = []
    subfield_arr = []
    schema_arr = []

    partition = None
    for i in range(len(getJsonUrl.json()["tableData"]["columns"])):
        if len(getJsonUrl.json()["tableData"]["columns"][i]["badges"]) > 0:
            partition = True
            col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
            col_type = 'partition'
            main_dict["name"] = col_name
            main_dict["mode"] = "NULLABLE"
            main_dict["type"] = col_type
        else:
            col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
            col_type = getJsonUrl.json()["tableData"]["columns"][i]["col_type"]
            if 'map' in col_type:
                col_type = 'string'
                main_dict["name"] = col_name
                main_dict["type"] = col_type
            elif 'bigint' == col_type:
                col_type = 'integer'
                main_dict["name"] = col_name
                main_dict["type"] = col_type
            elif 'int' == col_type:
                col_type = 'integer'
                main_dict["name"] = col_name
                main_dict["type"] = col_type
            elif 'double' == col_type:
                col_type = 'float'
                main_dict["name"] = col_name
                main_dict["type"] = col_type
            elif 'array<string>' == col_type:
                col_type = 'string'
                main_dict["name"] = col_name
                main_dict["type"] = col_type
                main_dict["mode"] = 'REPEATED'
            elif ('array<struct' in col_type or 'struct<' in col_type):
                main_dict["name"] = col_name
                main_dict["type"] = 'RECORD'
                if 'array<struct' in col_type:
                    main_dict["mode"] = 'REPEATED'
                else:
                    main_dict["mode"] = 'NULLABLE'
                raw_value = extractField(col_type)
                for i in raw_value:
                    if 'struct' not in i:
                        field_col_name = i.split(":")[0]
                        field_col_type = i.split(":")[1]
                        field_mode = 'NULLABLE'
                        struct_dict["name"] = field_col_name
                        struct_dict["type"] = field_col_type
                        struct_dict["mode"] = field_mode
                        field_arr.append(struct_dict)
                        struct_dict = {}
                    else:
                        subfield_col_name = i.split(":")[0]
                        subfield_col_tpye = "RECORD"
                        subfield_col_mode = 'NULLABLE'
                        struct_dict["name"] = subfield_col_name
                        struct_dict["type"] = subfield_col_tpye
                        struct_dict["mode"] = subfield_col_mode
                        raw_subfield_types = i.split("struct")[1].split(",")
                        for i in raw_subfield_types:
                            raw_subfield_name = cleanString(i.split(":")[0])
                            raw_subfield_type = cleanString(i.split(":")[1])
                            raw_subfield_mode = "NULLABLE"
                            subfield_struct_dict["name"] = raw_subfield_name
                            subfield_struct_dict["type"] = raw_subfield_type
                            subfield_struct_dict["mode"] = raw_subfield_mode
                            subfield_arr.append(subfield_struct_dict)
                        struct_dict["fields"] = subfield_arr
                        field_arr.append(struct_dict)
                        struct_dict = {}

                main_dict["fields"] = field_arr
                schema_arr.append(main_dict)
                main_dict = {}

            else:
                main_dict["name"] = col_name
                main_dict["type"] = col_type
                main_dict["mode"] = "NULLABLE"
                schema_arr.append(main_dict)
                main_dict = {}

    if len(main_dict) != 0:
        schema_arr.append(main_dict)

    logger.info("Schema definition succesfully extracted from Amundsen")
    return schema_arr, partition

代码解释:

第一部分(Request.get): 该部分仅用于提取问题中给出的示例字符串。

第二部分: 第二部分的灵感来自@Anjela B.,他解决了这个问题的未嵌套类型。我只是为嵌套字段添加了一个字典和数组结构,每次我们检查字段是否包含嵌套结构时。

第三部分: 看到我们正在使用extractField函数来解析嵌套结构,然后我们可以使用简单的split函数来获取 col_names、col_types 个子字段。

from itertools import accumulate
import re

def extractField(S):
    levels = accumulate((c == "<") - (n == ">") for c, n in zip(' ' + S, S + ' '))
    delim = "".join([c, "\n"][c == "," and lv == 2] for c, lv in zip(S, levels) if lv >= 2)
    fields = delim.split("\n")
    return fields

def cleanString(str_value):
    return re.sub(r'[\W_]+','', str_value)

最后这里也是生成bigquery schmea的函数:

def generate2(self):
    client = bigquery.Client()

    dataset = bigquery.Dataset(self.database)
    dataset.location = "EU"
    dataset = client.create_dataset(dataset, timeout=30, exists_ok=True)
    query = bigquery.Table(self.database + "." + self.table, schema=self.schema)
    query = client.create_table(query, exists_ok=True)
    logger.info("Schema created under google cloud project id as: {}".format(self.database))
    logger.info("Bigquery schema generated succesfully.")

结果:

将以下代码视为一种更通用的方法。该代码将 JSON 对象构造为字符串,然后使用 Python json 模块对其进行解析。 JSON 对象然后在 BigQuery table 创建期间作为 schema 传递给。

注意: 该代码适用于大多数 Hive 到 BigQuery 架构的转换。由于它基于此问题中考虑的架构,因此它可能尚未涵盖所有用例。

import json
import re

from google.cloud import bigquery

def clean_json_string(string):
    '''
    Function to remove trailing commas in the JSON string.
    '''
    regex = r'''(?<=[}\]"']),(?!\s*[{["'])'''
    clean_string = re.sub(regex, "", string, 0)

    return clean_string


def data_type_convert(string, data_type_conversion_dict):
    '''
    Function to map Hive data types to BigQuery data types.
    '''
    for a, b in data_type_conversion_dict.items():
        string = string.replace(a, b)

    return string


def hive_to_bq_schema(sample):
    '''
    Function to convert Hive table schema to BigQuery table schema. 
    '''
    init_dict = '[{"mode":"REPEATED", "name":"reports", "type":"RECORD", "fields":'

    init_list = re.split(" ", sample)
    init_list[1] = re.split(",|<|>", init_list[1])
    
    json_string = ""
    struct_count = 0
    array_count = 0
    data_type_conversion_dict = {"DOUBLE": "FLOAT", "INT":"INTEGER"}

    for item in init_list[1]:
        if "array" == item:
            json_string += '['

        if item == "":
            if struct_count > 0:
                json_string += ']},'
                struct_count -= 1
            if array_count > 1:
                json_string += '],'
                array_count -= 1

        if ":" in item:
            temp = item.split(":")
            if "struct" not in item and "array" not in item:
                json_string += '{"mode":"NULLABLE", "name":"'+temp[0]+'", "type":"'+temp[1].upper()+'"},'
            elif "struct" in item:
                json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "RECORD", "fields":['
                struct_count += 1
            elif "array" in item:
                json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "'+(str(init_list[1][init_list[1].index(item)+1])).upper()+'"},'
                array_count += 1

    bq_schema = init_dict+json_string+"]}]"

    return data_type_convert(clean_json_string(bq_schema), data_type_conversion_dict)


hive_schema = "reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"

bq_schema = hive_to_bq_schema(hive_schema)

### Output BigQuery schema
print(bq_schema)

schema = json.loads(bq_schema)

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "project.dataset.table"

# schema = y.copy()

table = bigquery.Table(table_id, schema=bq_schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

输出:

[
  {
    "mode": "REPEATED",
    "name": "reports",
    "type": "RECORD",
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "orderlineid",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "ordernumber",
        "type": "STRING"
      },
      {
        "mode": "REPEATED",
        "name": "price",
        "type": "RECORD",
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currencycode",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "value",
            "type": "FLOAT"
          }
        ]
      },
      {
        "mode": "NULLABLE",
        "name": "quantity",
        "type": "INTEGER"
      },
      {
        "mode": "REPEATED",
        "name": "serialnumbers",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "sku",
        "type": "STRING"
      }
    ]
  }
]