对 AWS Athena 查询结果进行分页时如何跳过 header
How to skip header when paginating through AWS Athena query results
我有一个 Angular 6 应用程序,它从 AWS Lambda 请求数据。数据本身存储在 Glue 数据库中并使用 AWS Athena 进行查询。
AWS Glue 数据库设置了 skip.header.line.count=1
选项,当我在控制台中 运行 Athena 查询时,我得到的响应没有 header。
当我尝试使用 boto3
检索数据时出现问题。我有一个函数,该函数 运行 是一个查询,然后对结果进行分页:
def run_query_paged(self, query, page_token=None, page_size=10):
"""
Run query.
"""
request = self.athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
execution_id = request['QueryExecutionId']
if execution_id:
while True:
stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
paginator = self.athena_client.get_paginator('get_query_results')
pagination_config = {
'MaxItems': page_size,
'PageSize': page_size,
}
if page_token:
pagination_config['StartingToken'] = page_token
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig=pagination_config
)
for page in response_iterator:
next_token = page.get('NextToken', '')
results = page
break
return {
'rows': process_results(results),
'nextToken': next_token
}
if status == 'FAILED':
raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])
return None
process_results
函数将响应转换为考虑列类型的列表:
def process_results(response):
"""
Processes the result of get_query_results function
"""
rows = response['ResultSet']['Rows']
meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
result = []
for row in rows:
parsed_row = {}
for idx, val in enumerate(row['Data']):
field = val
column_info = meta[idx]
if 'VarCharValue' in val:
value = val['VarCharValue']
else:
value = ''
parsed_row[column_info['Name']] = process_row_value(value, column_info)
result.append(parsed_row)
return result
问题是分页响应的第一页 header 的列名称如下:
{
"foo": "foo",
"bar": "bar"
},
{
"foo": 1,
"bar": 2
},
...
而所有其他页面都没有。当我从我的客户端应用程序请求第一页时,我得到 header 加上 9 行(页面大小为 10),当我使用 NextToken
请求下一页时,我得到 10 行而没有 header。第一页显示9项,后页显示10项是很尴尬的。
如何跳过 header 对结果进行分页?
我没有找到任何跳过 header 的选项,并通过在第一个请求中请求 page_size + 1
结果,然后在其余请求中请求 page_size
来破解它。
def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
"""
Returns the query result for the provided page as well as a token to the next page if there are more
results to retrieve for the query.
"""
paginator = self.athena_client.get_paginator('get_query_results')
# The first page of response contains header. Increase the page size for a first page and then
# remove header so that all the pages would have the same size.
if starting_token:
skip_header = False
else:
page_size += 1
skip_header = True
max_items = page_size * 2
pagination_config = {
'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
}
if starting_token:
pagination_config['StartingToken'] = starting_token
response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)
iterator_index = 0
results = EMPTY_ATHENA_RESPONSE
next_token = None
# Retrieve only a single page and return the next token for the caller to iterate the response.
for page in response_iterator:
if iterator_index > 0:
if len(page['ResultSet']['Rows']) == 0:
next_token = None
break
next_token = page.get('NextToken')
results = page
iterator_index += 1
# ... process and return results
图我会添加我的。我将其分为三个部分 - 启动查询、对查询结果页面进行分页以及将结果标准化为 lists/dictionaries:
列表
import boto3
import logging
from time import sleep
def query_athena_table(sql_query, database, **kwargs):
client = boto3.client('athena')
query_started = client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={'Database': database},
ResultConfiguration={"OutputLocation": f"s3://your-specific-athena-query-results-bucket"}
)
timeout_value = kwargs.get("timeout", 15) * 1000 # bc its in milliseconds
finished = False
logging.info("Started Athena Query")
while not finished:
query_in_flight = client.get_query_execution(QueryExecutionId=query_started["QueryExecutionId"])
query_status = query_in_flight["QueryExecution"]["Status"]["State"]
if query_status == 'SUCCEEDED':
finished = True
elif query_status in ['FAILED', 'CANCELLED']:
logging.error(query_in_flight['QueryExecution']['Status']['StateChangeReason'])
return None
elif timeout_value < ez_get(query_in_flight, "QueryExecution", "Statistics", "TotalExecutionTimeInMillis"):
logging.warning(f"Query timed out with no response (timeout val: {timeout_value})")
return None
else:
sleep(kwargs.get("wait_interval", 0.1))
return paginate_athena_response(client, query_started["QueryExecutionId"], **kwargs)
# about 4s per 10k rows, with a floor of ~0.33s if only one page
def paginate_athena_response(client, execution_id: str, **kwargs):
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig={
'MaxItems': kwargs.get("max_results", 100000),
'PageSize': 1000,
'StartingToken': kwargs.get("pagination_starting_token", None),
})
results = []
# Iterate through pages. The NextToken logic is handled for you.
for n, page in enumerate(response_iterator):
logging.info(f"Now on page {n}, rows on this page: {len(page['ResultSet']['Rows'])}")
if n > 0 and len(page['ResultSet']['Rows']) == 0: # probably redundant
break
results += standardize_athena_query_result(page, **kwargs)
kwargs["headers"] = list(results[0].keys()) # prevent parser from .pop(0) after 1st page
return results
def standardize_athena_query_result(results, **kwargs):
results = [x["Data"] for x in results['ResultSet']['Rows']]
for n, row in enumerate(results):
results[n] = [x['VarCharValue'] for x in row]
if kwargs.get("output_lod"):
headers = kwargs.get("headers") or results.pop(0)
output_lod = []
for n, result_row in enumerate(results):
output_lod.append({headers[i]:result_row[i] for i in range(0, len(result_row))})
return output_lod
return results
我有一个 Angular 6 应用程序,它从 AWS Lambda 请求数据。数据本身存储在 Glue 数据库中并使用 AWS Athena 进行查询。
AWS Glue 数据库设置了 skip.header.line.count=1
选项,当我在控制台中 运行 Athena 查询时,我得到的响应没有 header。
当我尝试使用 boto3
检索数据时出现问题。我有一个函数,该函数 运行 是一个查询,然后对结果进行分页:
def run_query_paged(self, query, page_token=None, page_size=10):
"""
Run query.
"""
request = self.athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
execution_id = request['QueryExecutionId']
if execution_id:
while True:
stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
paginator = self.athena_client.get_paginator('get_query_results')
pagination_config = {
'MaxItems': page_size,
'PageSize': page_size,
}
if page_token:
pagination_config['StartingToken'] = page_token
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig=pagination_config
)
for page in response_iterator:
next_token = page.get('NextToken', '')
results = page
break
return {
'rows': process_results(results),
'nextToken': next_token
}
if status == 'FAILED':
raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])
return None
process_results
函数将响应转换为考虑列类型的列表:
def process_results(response):
"""
Processes the result of get_query_results function
"""
rows = response['ResultSet']['Rows']
meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
result = []
for row in rows:
parsed_row = {}
for idx, val in enumerate(row['Data']):
field = val
column_info = meta[idx]
if 'VarCharValue' in val:
value = val['VarCharValue']
else:
value = ''
parsed_row[column_info['Name']] = process_row_value(value, column_info)
result.append(parsed_row)
return result
问题是分页响应的第一页 header 的列名称如下:
{
"foo": "foo",
"bar": "bar"
},
{
"foo": 1,
"bar": 2
},
...
而所有其他页面都没有。当我从我的客户端应用程序请求第一页时,我得到 header 加上 9 行(页面大小为 10),当我使用 NextToken
请求下一页时,我得到 10 行而没有 header。第一页显示9项,后页显示10项是很尴尬的。
如何跳过 header 对结果进行分页?
我没有找到任何跳过 header 的选项,并通过在第一个请求中请求 page_size + 1
结果,然后在其余请求中请求 page_size
来破解它。
def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
"""
Returns the query result for the provided page as well as a token to the next page if there are more
results to retrieve for the query.
"""
paginator = self.athena_client.get_paginator('get_query_results')
# The first page of response contains header. Increase the page size for a first page and then
# remove header so that all the pages would have the same size.
if starting_token:
skip_header = False
else:
page_size += 1
skip_header = True
max_items = page_size * 2
pagination_config = {
'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
}
if starting_token:
pagination_config['StartingToken'] = starting_token
response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)
iterator_index = 0
results = EMPTY_ATHENA_RESPONSE
next_token = None
# Retrieve only a single page and return the next token for the caller to iterate the response.
for page in response_iterator:
if iterator_index > 0:
if len(page['ResultSet']['Rows']) == 0:
next_token = None
break
next_token = page.get('NextToken')
results = page
iterator_index += 1
# ... process and return results
图我会添加我的。我将其分为三个部分 - 启动查询、对查询结果页面进行分页以及将结果标准化为 lists/dictionaries:
列表import boto3
import logging
from time import sleep
def query_athena_table(sql_query, database, **kwargs):
client = boto3.client('athena')
query_started = client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={'Database': database},
ResultConfiguration={"OutputLocation": f"s3://your-specific-athena-query-results-bucket"}
)
timeout_value = kwargs.get("timeout", 15) * 1000 # bc its in milliseconds
finished = False
logging.info("Started Athena Query")
while not finished:
query_in_flight = client.get_query_execution(QueryExecutionId=query_started["QueryExecutionId"])
query_status = query_in_flight["QueryExecution"]["Status"]["State"]
if query_status == 'SUCCEEDED':
finished = True
elif query_status in ['FAILED', 'CANCELLED']:
logging.error(query_in_flight['QueryExecution']['Status']['StateChangeReason'])
return None
elif timeout_value < ez_get(query_in_flight, "QueryExecution", "Statistics", "TotalExecutionTimeInMillis"):
logging.warning(f"Query timed out with no response (timeout val: {timeout_value})")
return None
else:
sleep(kwargs.get("wait_interval", 0.1))
return paginate_athena_response(client, query_started["QueryExecutionId"], **kwargs)
# about 4s per 10k rows, with a floor of ~0.33s if only one page
def paginate_athena_response(client, execution_id: str, **kwargs):
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig={
'MaxItems': kwargs.get("max_results", 100000),
'PageSize': 1000,
'StartingToken': kwargs.get("pagination_starting_token", None),
})
results = []
# Iterate through pages. The NextToken logic is handled for you.
for n, page in enumerate(response_iterator):
logging.info(f"Now on page {n}, rows on this page: {len(page['ResultSet']['Rows'])}")
if n > 0 and len(page['ResultSet']['Rows']) == 0: # probably redundant
break
results += standardize_athena_query_result(page, **kwargs)
kwargs["headers"] = list(results[0].keys()) # prevent parser from .pop(0) after 1st page
return results
def standardize_athena_query_result(results, **kwargs):
results = [x["Data"] for x in results['ResultSet']['Rows']]
for n, row in enumerate(results):
results[n] = [x['VarCharValue'] for x in row]
if kwargs.get("output_lod"):
headers = kwargs.get("headers") or results.pop(0)
output_lod = []
for n, result_row in enumerate(results):
output_lod.append({headers[i]:result_row[i] for i in range(0, len(result_row))})
return output_lod
return results