如何使用 Python 将嵌套的 Hive 查询模式转换为 Bigquery 模式?
How to convert nested Hive query schema into Bigquery schema using Python?
问题前的快速说明: 在从本地 hadoop 实例迁移到 BigQuery 的过程中,我们需要将大量 Hive 架构传输到 BigQuery 架构。我针对未嵌套模式转换提出了类似的问题,@Anjela 亲切地回答了这个非常有用的问题。但是还有另一个将嵌套结构类型模式传输到 BigQuery 模式的用例,您可以在下面找到详细信息
示例 Hive 架构:
样本=
"reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"
所需的 BigQuery 架构:
bigquery.SchemaField("reports", "RECORD", mode="REPEATED",
fields=(
bigquery.SchemaField('orderline', 'STRING'),
bigquery.SchemaField('ordernumber', 'STRING'),
bigquery.SchemaField('price', 'RECORD'),
fields=(
bigquery.SchemaField('currencyCode', 'STRING'),
bigquery.SchemaField('value', 'FLOAT')
)
bigquery.SchemaField('quantity', 'INTEGER'),
bigquery.SchemaField('serialnumbers', 'STRING', mode=REPEATED),
bigquery.SchemaField('sku', 'STRING'),
)
)
我们从上一个问题中得到的对于将未嵌套模式转换为 bigquery 模式很有用:
import re
from google.cloud import bigquery
def is_even(number):
if (number % 2) == 0:
return True
else:
return False
def clean_string(str_value):
return re.sub(r'[\W_]+', '', str_value)
def convert_to_bqdict(api_string):
"""
This only works for a struct with multiple fields
This could give you an idea on constructing a schema dict for BigQuery
"""
num_even = True
main_dict = {}
struct_dict = {}
field_arr = []
schema_arr = []
# Hard coded this since not sure what the string will look like if there are more inputs
init_struct = sample.split(' ')
main_dict["name"] = init_struct[0]
main_dict["type"] = "RECORD"
main_dict["mode"] = "NULLABLE"
cont_struct = init_struct[1].split('<')
num_elem = len(cont_struct)
# parse fields inside of struct<
for i in range(0,num_elem):
num_even = is_even(i)
# fields are seen on even indices
if num_even and i != 0:
temp = list(filter(None,cont_struct[i].split(','))) # remove blank elements
for elem in temp:
fields = list(filter(None,elem.split(':')))
struct_dict["name"] = clean_string(fields[0])
# "type" works for STRING as of the moment refer to
# https://cloud.google.com/bigquery/docs/schemas#standard_sql_data_types
# for the accepted data types
struct_dict["type"] = clean_string(fields[1]).upper()
struct_dict["mode"] = "NULLABLE"
field_arr.append(struct_dict)
struct_dict = {}
main_dict["fields"] = field_arr # assign dict to array of fields
schema_arr.append(main_dict)
return schema_arr
sample = "reports array<struct<imageUrl:string,reportedBy:string,newfield:bool>>"
bq_dict = convert_to_bqdict(sample)
client = bigquery.Client()
project = client.project
dataset_ref = bigquery.DatasetReference(project, '20211228')
table_ref = dataset_ref.table("20220203")
table = bigquery.Table(table_ref, schema=bq_dict)
table = client.create_table(table)
@Anjela B. 的上述脚本正在将非嵌套查询从 hive 模式传输到 bigquery 模式,如下所示:
"name":"reports"
"col_type":"array<struct<imageUrl:string,reportedBy:string>>"
任何 help/tips 将不胜感激。
解决方法如下:
def getSchema2(self):
try:
getJsonUrl = requests.get(self.urlAddress)
except requests.exceptions.Timeout:
time.sleep(300)
try:
getJsonUrl = requests.get(self.urlAddress)
except requests.exceptions.Timeout as e:
logger.error("Communication error: {}".format(e))
raise SystemExit(e)
except requests.exceptions.TooManyRedirects:
logger.info("URL is not correct check your variables")
except requests.exceptions.RequestException as e:
logger.info("Communication error: {}".format(e))
raise SystemExit(e)
print(getJsonUrl.json()["tableData"]["columns"])
main_dict = {}
struct_dict = {}
subfield_struct_dict = {}
field_arr = []
subfield_arr = []
schema_arr = []
partition = None
for i in range(len(getJsonUrl.json()["tableData"]["columns"])):
if len(getJsonUrl.json()["tableData"]["columns"][i]["badges"]) > 0:
partition = True
col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
col_type = 'partition'
main_dict["name"] = col_name
main_dict["mode"] = "NULLABLE"
main_dict["type"] = col_type
else:
col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
col_type = getJsonUrl.json()["tableData"]["columns"][i]["col_type"]
if 'map' in col_type:
col_type = 'string'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'bigint' == col_type:
col_type = 'integer'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'int' == col_type:
col_type = 'integer'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'double' == col_type:
col_type = 'float'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'array<string>' == col_type:
col_type = 'string'
main_dict["name"] = col_name
main_dict["type"] = col_type
main_dict["mode"] = 'REPEATED'
elif ('array<struct' in col_type or 'struct<' in col_type):
main_dict["name"] = col_name
main_dict["type"] = 'RECORD'
if 'array<struct' in col_type:
main_dict["mode"] = 'REPEATED'
else:
main_dict["mode"] = 'NULLABLE'
raw_value = extractField(col_type)
for i in raw_value:
if 'struct' not in i:
field_col_name = i.split(":")[0]
field_col_type = i.split(":")[1]
field_mode = 'NULLABLE'
struct_dict["name"] = field_col_name
struct_dict["type"] = field_col_type
struct_dict["mode"] = field_mode
field_arr.append(struct_dict)
struct_dict = {}
else:
subfield_col_name = i.split(":")[0]
subfield_col_tpye = "RECORD"
subfield_col_mode = 'NULLABLE'
struct_dict["name"] = subfield_col_name
struct_dict["type"] = subfield_col_tpye
struct_dict["mode"] = subfield_col_mode
raw_subfield_types = i.split("struct")[1].split(",")
for i in raw_subfield_types:
raw_subfield_name = cleanString(i.split(":")[0])
raw_subfield_type = cleanString(i.split(":")[1])
raw_subfield_mode = "NULLABLE"
subfield_struct_dict["name"] = raw_subfield_name
subfield_struct_dict["type"] = raw_subfield_type
subfield_struct_dict["mode"] = raw_subfield_mode
subfield_arr.append(subfield_struct_dict)
struct_dict["fields"] = subfield_arr
field_arr.append(struct_dict)
struct_dict = {}
main_dict["fields"] = field_arr
schema_arr.append(main_dict)
main_dict = {}
else:
main_dict["name"] = col_name
main_dict["type"] = col_type
main_dict["mode"] = "NULLABLE"
schema_arr.append(main_dict)
main_dict = {}
if len(main_dict) != 0:
schema_arr.append(main_dict)
logger.info("Schema definition succesfully extracted from Amundsen")
return schema_arr, partition
代码解释:
第一部分(Request.get): 该部分仅用于提取问题中给出的示例字符串。
第二部分:
第二部分的灵感来自@Anjela B.,他解决了这个问题的未嵌套类型。我只是为嵌套字段添加了一个字典和数组结构,每次我们检查字段是否包含嵌套结构时。
第三部分: 看到我们正在使用extractField函数来解析嵌套结构,然后我们可以使用简单的split函数来获取 col_names、col_types 个子字段。
from itertools import accumulate
import re
def extractField(S):
levels = accumulate((c == "<") - (n == ">") for c, n in zip(' ' + S, S + ' '))
delim = "".join([c, "\n"][c == "," and lv == 2] for c, lv in zip(S, levels) if lv >= 2)
fields = delim.split("\n")
return fields
def cleanString(str_value):
return re.sub(r'[\W_]+','', str_value)
最后这里也是生成bigquery schmea的函数:
def generate2(self):
client = bigquery.Client()
dataset = bigquery.Dataset(self.database)
dataset.location = "EU"
dataset = client.create_dataset(dataset, timeout=30, exists_ok=True)
query = bigquery.Table(self.database + "." + self.table, schema=self.schema)
query = client.create_table(query, exists_ok=True)
logger.info("Schema created under google cloud project id as: {}".format(self.database))
logger.info("Bigquery schema generated succesfully.")
结果:
将以下代码视为一种更通用的方法。该代码将 JSON
对象构造为字符串,然后使用 Python json
模块对其进行解析。 JSON
对象然后在 BigQuery table 创建期间作为 schema
传递给。
注意: 该代码适用于大多数 Hive 到 BigQuery 架构的转换。由于它基于此问题中考虑的架构,因此它可能尚未涵盖所有用例。
import json
import re
from google.cloud import bigquery
def clean_json_string(string):
'''
Function to remove trailing commas in the JSON string.
'''
regex = r'''(?<=[}\]"']),(?!\s*[{["'])'''
clean_string = re.sub(regex, "", string, 0)
return clean_string
def data_type_convert(string, data_type_conversion_dict):
'''
Function to map Hive data types to BigQuery data types.
'''
for a, b in data_type_conversion_dict.items():
string = string.replace(a, b)
return string
def hive_to_bq_schema(sample):
'''
Function to convert Hive table schema to BigQuery table schema.
'''
init_dict = '[{"mode":"REPEATED", "name":"reports", "type":"RECORD", "fields":'
init_list = re.split(" ", sample)
init_list[1] = re.split(",|<|>", init_list[1])
json_string = ""
struct_count = 0
array_count = 0
data_type_conversion_dict = {"DOUBLE": "FLOAT", "INT":"INTEGER"}
for item in init_list[1]:
if "array" == item:
json_string += '['
if item == "":
if struct_count > 0:
json_string += ']},'
struct_count -= 1
if array_count > 1:
json_string += '],'
array_count -= 1
if ":" in item:
temp = item.split(":")
if "struct" not in item and "array" not in item:
json_string += '{"mode":"NULLABLE", "name":"'+temp[0]+'", "type":"'+temp[1].upper()+'"},'
elif "struct" in item:
json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "RECORD", "fields":['
struct_count += 1
elif "array" in item:
json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "'+(str(init_list[1][init_list[1].index(item)+1])).upper()+'"},'
array_count += 1
bq_schema = init_dict+json_string+"]}]"
return data_type_convert(clean_json_string(bq_schema), data_type_conversion_dict)
hive_schema = "reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"
bq_schema = hive_to_bq_schema(hive_schema)
### Output BigQuery schema
print(bq_schema)
schema = json.loads(bq_schema)
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "project.dataset.table"
# schema = y.copy()
table = bigquery.Table(table_id, schema=bq_schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
输出:
[
{
"mode": "REPEATED",
"name": "reports",
"type": "RECORD",
"fields": [
{
"mode": "NULLABLE",
"name": "orderlineid",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ordernumber",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "price",
"type": "RECORD",
"fields": [
{
"mode": "NULLABLE",
"name": "currencycode",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "value",
"type": "FLOAT"
}
]
},
{
"mode": "NULLABLE",
"name": "quantity",
"type": "INTEGER"
},
{
"mode": "REPEATED",
"name": "serialnumbers",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "sku",
"type": "STRING"
}
]
}
]
问题前的快速说明: 在从本地 hadoop 实例迁移到 BigQuery 的过程中,我们需要将大量 Hive 架构传输到 BigQuery 架构。我针对未嵌套模式转换提出了类似的问题,@Anjela 亲切地回答了这个非常有用的问题。但是还有另一个将嵌套结构类型模式传输到 BigQuery 模式的用例,您可以在下面找到详细信息
示例 Hive 架构:
样本=
"reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"
所需的 BigQuery 架构:
bigquery.SchemaField("reports", "RECORD", mode="REPEATED",
fields=(
bigquery.SchemaField('orderline', 'STRING'),
bigquery.SchemaField('ordernumber', 'STRING'),
bigquery.SchemaField('price', 'RECORD'),
fields=(
bigquery.SchemaField('currencyCode', 'STRING'),
bigquery.SchemaField('value', 'FLOAT')
)
bigquery.SchemaField('quantity', 'INTEGER'),
bigquery.SchemaField('serialnumbers', 'STRING', mode=REPEATED),
bigquery.SchemaField('sku', 'STRING'),
)
)
我们从上一个问题中得到的对于将未嵌套模式转换为 bigquery 模式很有用:
import re
from google.cloud import bigquery
def is_even(number):
if (number % 2) == 0:
return True
else:
return False
def clean_string(str_value):
return re.sub(r'[\W_]+', '', str_value)
def convert_to_bqdict(api_string):
"""
This only works for a struct with multiple fields
This could give you an idea on constructing a schema dict for BigQuery
"""
num_even = True
main_dict = {}
struct_dict = {}
field_arr = []
schema_arr = []
# Hard coded this since not sure what the string will look like if there are more inputs
init_struct = sample.split(' ')
main_dict["name"] = init_struct[0]
main_dict["type"] = "RECORD"
main_dict["mode"] = "NULLABLE"
cont_struct = init_struct[1].split('<')
num_elem = len(cont_struct)
# parse fields inside of struct<
for i in range(0,num_elem):
num_even = is_even(i)
# fields are seen on even indices
if num_even and i != 0:
temp = list(filter(None,cont_struct[i].split(','))) # remove blank elements
for elem in temp:
fields = list(filter(None,elem.split(':')))
struct_dict["name"] = clean_string(fields[0])
# "type" works for STRING as of the moment refer to
# https://cloud.google.com/bigquery/docs/schemas#standard_sql_data_types
# for the accepted data types
struct_dict["type"] = clean_string(fields[1]).upper()
struct_dict["mode"] = "NULLABLE"
field_arr.append(struct_dict)
struct_dict = {}
main_dict["fields"] = field_arr # assign dict to array of fields
schema_arr.append(main_dict)
return schema_arr
sample = "reports array<struct<imageUrl:string,reportedBy:string,newfield:bool>>"
bq_dict = convert_to_bqdict(sample)
client = bigquery.Client()
project = client.project
dataset_ref = bigquery.DatasetReference(project, '20211228')
table_ref = dataset_ref.table("20220203")
table = bigquery.Table(table_ref, schema=bq_dict)
table = client.create_table(table)
@Anjela B. 的上述脚本正在将非嵌套查询从 hive 模式传输到 bigquery 模式,如下所示:
"name":"reports"
"col_type":"array<struct<imageUrl:string,reportedBy:string>>"
任何 help/tips 将不胜感激。
解决方法如下:
def getSchema2(self):
try:
getJsonUrl = requests.get(self.urlAddress)
except requests.exceptions.Timeout:
time.sleep(300)
try:
getJsonUrl = requests.get(self.urlAddress)
except requests.exceptions.Timeout as e:
logger.error("Communication error: {}".format(e))
raise SystemExit(e)
except requests.exceptions.TooManyRedirects:
logger.info("URL is not correct check your variables")
except requests.exceptions.RequestException as e:
logger.info("Communication error: {}".format(e))
raise SystemExit(e)
print(getJsonUrl.json()["tableData"]["columns"])
main_dict = {}
struct_dict = {}
subfield_struct_dict = {}
field_arr = []
subfield_arr = []
schema_arr = []
partition = None
for i in range(len(getJsonUrl.json()["tableData"]["columns"])):
if len(getJsonUrl.json()["tableData"]["columns"][i]["badges"]) > 0:
partition = True
col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
col_type = 'partition'
main_dict["name"] = col_name
main_dict["mode"] = "NULLABLE"
main_dict["type"] = col_type
else:
col_name = getJsonUrl.json()["tableData"]["columns"][i]["name"]
col_type = getJsonUrl.json()["tableData"]["columns"][i]["col_type"]
if 'map' in col_type:
col_type = 'string'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'bigint' == col_type:
col_type = 'integer'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'int' == col_type:
col_type = 'integer'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'double' == col_type:
col_type = 'float'
main_dict["name"] = col_name
main_dict["type"] = col_type
elif 'array<string>' == col_type:
col_type = 'string'
main_dict["name"] = col_name
main_dict["type"] = col_type
main_dict["mode"] = 'REPEATED'
elif ('array<struct' in col_type or 'struct<' in col_type):
main_dict["name"] = col_name
main_dict["type"] = 'RECORD'
if 'array<struct' in col_type:
main_dict["mode"] = 'REPEATED'
else:
main_dict["mode"] = 'NULLABLE'
raw_value = extractField(col_type)
for i in raw_value:
if 'struct' not in i:
field_col_name = i.split(":")[0]
field_col_type = i.split(":")[1]
field_mode = 'NULLABLE'
struct_dict["name"] = field_col_name
struct_dict["type"] = field_col_type
struct_dict["mode"] = field_mode
field_arr.append(struct_dict)
struct_dict = {}
else:
subfield_col_name = i.split(":")[0]
subfield_col_tpye = "RECORD"
subfield_col_mode = 'NULLABLE'
struct_dict["name"] = subfield_col_name
struct_dict["type"] = subfield_col_tpye
struct_dict["mode"] = subfield_col_mode
raw_subfield_types = i.split("struct")[1].split(",")
for i in raw_subfield_types:
raw_subfield_name = cleanString(i.split(":")[0])
raw_subfield_type = cleanString(i.split(":")[1])
raw_subfield_mode = "NULLABLE"
subfield_struct_dict["name"] = raw_subfield_name
subfield_struct_dict["type"] = raw_subfield_type
subfield_struct_dict["mode"] = raw_subfield_mode
subfield_arr.append(subfield_struct_dict)
struct_dict["fields"] = subfield_arr
field_arr.append(struct_dict)
struct_dict = {}
main_dict["fields"] = field_arr
schema_arr.append(main_dict)
main_dict = {}
else:
main_dict["name"] = col_name
main_dict["type"] = col_type
main_dict["mode"] = "NULLABLE"
schema_arr.append(main_dict)
main_dict = {}
if len(main_dict) != 0:
schema_arr.append(main_dict)
logger.info("Schema definition succesfully extracted from Amundsen")
return schema_arr, partition
代码解释:
第一部分(Request.get): 该部分仅用于提取问题中给出的示例字符串。
第二部分: 第二部分的灵感来自@Anjela B.,他解决了这个问题的未嵌套类型。我只是为嵌套字段添加了一个字典和数组结构,每次我们检查字段是否包含嵌套结构时。
第三部分: 看到我们正在使用extractField函数来解析嵌套结构,然后我们可以使用简单的split函数来获取 col_names、col_types 个子字段。
from itertools import accumulate
import re
def extractField(S):
levels = accumulate((c == "<") - (n == ">") for c, n in zip(' ' + S, S + ' '))
delim = "".join([c, "\n"][c == "," and lv == 2] for c, lv in zip(S, levels) if lv >= 2)
fields = delim.split("\n")
return fields
def cleanString(str_value):
return re.sub(r'[\W_]+','', str_value)
最后这里也是生成bigquery schmea的函数:
def generate2(self):
client = bigquery.Client()
dataset = bigquery.Dataset(self.database)
dataset.location = "EU"
dataset = client.create_dataset(dataset, timeout=30, exists_ok=True)
query = bigquery.Table(self.database + "." + self.table, schema=self.schema)
query = client.create_table(query, exists_ok=True)
logger.info("Schema created under google cloud project id as: {}".format(self.database))
logger.info("Bigquery schema generated succesfully.")
结果:
将以下代码视为一种更通用的方法。该代码将 JSON
对象构造为字符串,然后使用 Python json
模块对其进行解析。 JSON
对象然后在 BigQuery table 创建期间作为 schema
传递给。
注意: 该代码适用于大多数 Hive 到 BigQuery 架构的转换。由于它基于此问题中考虑的架构,因此它可能尚未涵盖所有用例。
import json
import re
from google.cloud import bigquery
def clean_json_string(string):
'''
Function to remove trailing commas in the JSON string.
'''
regex = r'''(?<=[}\]"']),(?!\s*[{["'])'''
clean_string = re.sub(regex, "", string, 0)
return clean_string
def data_type_convert(string, data_type_conversion_dict):
'''
Function to map Hive data types to BigQuery data types.
'''
for a, b in data_type_conversion_dict.items():
string = string.replace(a, b)
return string
def hive_to_bq_schema(sample):
'''
Function to convert Hive table schema to BigQuery table schema.
'''
init_dict = '[{"mode":"REPEATED", "name":"reports", "type":"RECORD", "fields":'
init_list = re.split(" ", sample)
init_list[1] = re.split(",|<|>", init_list[1])
json_string = ""
struct_count = 0
array_count = 0
data_type_conversion_dict = {"DOUBLE": "FLOAT", "INT":"INTEGER"}
for item in init_list[1]:
if "array" == item:
json_string += '['
if item == "":
if struct_count > 0:
json_string += ']},'
struct_count -= 1
if array_count > 1:
json_string += '],'
array_count -= 1
if ":" in item:
temp = item.split(":")
if "struct" not in item and "array" not in item:
json_string += '{"mode":"NULLABLE", "name":"'+temp[0]+'", "type":"'+temp[1].upper()+'"},'
elif "struct" in item:
json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "RECORD", "fields":['
struct_count += 1
elif "array" in item:
json_string += '{"mode":"REPEATED", "name":"'+temp[0]+'", "type": "'+(str(init_list[1][init_list[1].index(item)+1])).upper()+'"},'
array_count += 1
bq_schema = init_dict+json_string+"]}]"
return data_type_convert(clean_json_string(bq_schema), data_type_conversion_dict)
hive_schema = "reports array<struct<orderlineid:string,ordernumber:string,price:struct<currencycode:string,value:double>,quantity:int,serialnumbers:array<string>,sku:string>>"
bq_schema = hive_to_bq_schema(hive_schema)
### Output BigQuery schema
print(bq_schema)
schema = json.loads(bq_schema)
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "project.dataset.table"
# schema = y.copy()
table = bigquery.Table(table_id, schema=bq_schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
输出:
[
{
"mode": "REPEATED",
"name": "reports",
"type": "RECORD",
"fields": [
{
"mode": "NULLABLE",
"name": "orderlineid",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ordernumber",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "price",
"type": "RECORD",
"fields": [
{
"mode": "NULLABLE",
"name": "currencycode",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "value",
"type": "FLOAT"
}
]
},
{
"mode": "NULLABLE",
"name": "quantity",
"type": "INTEGER"
},
{
"mode": "REPEATED",
"name": "serialnumbers",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "sku",
"type": "STRING"
}
]
}
]