AWS Glue create_partition 使用 boto3 成功,但 Athena 未显示查询结果
AWS Glue create_partition using boto3 successful, but Athena not showing results for query
我有一个使用 create_partition() 创建新分区的胶水脚本。粘合脚本 运行ning 成功,使用 SHOW PARTITIONS 时我可以在 Athena 控制台中看到分区。对于胶水脚本 create_partitions,我确实在这里参考了这个示例代码:https://medium.com/@bv_subhash/demystifying-the-ways-of-creating-partitions-in-glue-catalog-on-partitioned-s3-data-for-faster-e25671e65574
当我尝试 运行 Athena 查询新添加的给定分区时,我没有得到任何结果。
是否需要触发MSCK命令,即使我使用create_partition添加分区。请欣赏任何建议
.
我自己找到了解决方案,想与 SO 社区分享,这样对某人有用。以下代码在 运行 作为粘合作业时创建分区,也可以在 Athena 中查询新的分区列。请 change/add 参数值 db 名称,table 名称,根据需要分区列。
import boto3
import urllib.parse
import os
import copy
import sys
# Configure database / table name and emp_id, file_id from workflow params?
DATABASE_NAME = 'my_db'
TABLE_NAME = 'enter_table_name'
emp_id_tmp = ''
file_id_tmp = ''
# # Initialise the Glue client using Boto 3
glue_client = boto3.client('glue')
#get current table schema for the given database name & table name
def get_current_schema(database_name, table_name):
try:
response = glue_client.get_table(
DatabaseName=DATABASE_NAME,
Name=TABLE_NAME
)
except Exception as error:
print("Exception while fetching table info")
sys.exit(-1)
# Parsing table info required to create partitions from table
table_data = {}
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat']
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat']
table_data['table_location'] = response['Table']['StorageDescriptor']['Location']
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo']
table_data['partition_keys'] = response['Table']['PartitionKeys']
return table_data
#prepare partition input list using table_data
def generate_partition_input_list(table_data):
input_list = [] # Initializing empty list
part_location = "{}/emp_id={}/file_id={}/".format(table_data['table_location'], emp_id_tmp, file_id_tmp)
input_dict = {
'Values': [
emp_id_tmp, file_id_tmp
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': table_data['input_format'],
'OutputFormat': table_data['output_format'],
'SerdeInfo': table_data['serde_info']
}
}
input_list.append(input_dict.copy())
return input_list
#create partition dynamically using the partition input list
table_data = get_current_schema(DATABASE_NAME, TABLE_NAME)
input_list = generate_partition_input_list(table_data)
try:
create_partition_response = glue_client.batch_create_partition(
DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
PartitionInputList=input_list
)
print('Glue partition created successfully.')
print(create_partition_response)
except Exception as e:
# Handle exception as per your business requirements
print(e)
我有一个使用 create_partition() 创建新分区的胶水脚本。粘合脚本 运行ning 成功,使用 SHOW PARTITIONS 时我可以在 Athena 控制台中看到分区。对于胶水脚本 create_partitions,我确实在这里参考了这个示例代码:https://medium.com/@bv_subhash/demystifying-the-ways-of-creating-partitions-in-glue-catalog-on-partitioned-s3-data-for-faster-e25671e65574
当我尝试 运行 Athena 查询新添加的给定分区时,我没有得到任何结果。
是否需要触发MSCK命令,即使我使用create_partition添加分区。请欣赏任何建议 .
我自己找到了解决方案,想与 SO 社区分享,这样对某人有用。以下代码在 运行 作为粘合作业时创建分区,也可以在 Athena 中查询新的分区列。请 change/add 参数值 db 名称,table 名称,根据需要分区列。
import boto3
import urllib.parse
import os
import copy
import sys
# Configure database / table name and emp_id, file_id from workflow params?
DATABASE_NAME = 'my_db'
TABLE_NAME = 'enter_table_name'
emp_id_tmp = ''
file_id_tmp = ''
# # Initialise the Glue client using Boto 3
glue_client = boto3.client('glue')
#get current table schema for the given database name & table name
def get_current_schema(database_name, table_name):
try:
response = glue_client.get_table(
DatabaseName=DATABASE_NAME,
Name=TABLE_NAME
)
except Exception as error:
print("Exception while fetching table info")
sys.exit(-1)
# Parsing table info required to create partitions from table
table_data = {}
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat']
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat']
table_data['table_location'] = response['Table']['StorageDescriptor']['Location']
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo']
table_data['partition_keys'] = response['Table']['PartitionKeys']
return table_data
#prepare partition input list using table_data
def generate_partition_input_list(table_data):
input_list = [] # Initializing empty list
part_location = "{}/emp_id={}/file_id={}/".format(table_data['table_location'], emp_id_tmp, file_id_tmp)
input_dict = {
'Values': [
emp_id_tmp, file_id_tmp
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': table_data['input_format'],
'OutputFormat': table_data['output_format'],
'SerdeInfo': table_data['serde_info']
}
}
input_list.append(input_dict.copy())
return input_list
#create partition dynamically using the partition input list
table_data = get_current_schema(DATABASE_NAME, TABLE_NAME)
input_list = generate_partition_input_list(table_data)
try:
create_partition_response = glue_client.batch_create_partition(
DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
PartitionInputList=input_list
)
print('Glue partition created successfully.')
print(create_partition_response)
except Exception as e:
# Handle exception as per your business requirements
print(e)