从 Lambda 函数查询 Athena - QUEUED 状态?
Querying Athena from Lambda function - QUEUED state?
我已经在 lambda 函数内部通过 athena 成功查询 s3 很长一段时间了,但它突然停止工作了。进一步调查显示来自 get_query_execution() 的响应返回 'QUEUED' 状态(我被引导相信没有使用?!)
我的代码如下:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED':
return False
time.sleep(10)
return False
所以它显然在正常工作 - 只是 'QUEUED' 状态完全出乎意料,我不知道该怎么办?是什么导致 query_execution 变成 'QUEUED' 以及我的代码需要更改什么以适应它?
查看 Apache Airflow 中的 Athena hook。 Athena 有最终状态(SUCCEEDED、FAILED 和 CANCELLED)和中间状态 - 运行 和 QUEUED。 QUEUED 是查询开始之前的正常状态。所以你可以使用这样的代码:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'QUEUED'
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED' or state == 'CANCELLED':
return False
time.sleep(10)
return False
从 AWS 得到了这个响应 - Athena 的变化导致了这个问题(尽管 QUEUED
已经在状态枚举中有一段时间了,直到现在才被使用):
Athena 团队最近为 Athena 部署了一系列新功能,包括用于 Athena 查询的更精细的 CloudWatch 指标。
更多信息:
AWS What's New page
雅典娜文档 CloudWatch metrics
作为更精细指标部署的一部分,Athena 现在包含查询的 QUEUED
状态。此状态表示 Athena 查询正在等待分配资源进行处理。查询流程大致为:
SUBMITTED -> QUEUED -> RUNNING -> COMPLETED/FAILED
请注意,由于系统错误而失败的查询可以放回队列中并重试。
对于此更改造成的挫败感,我深表歉意。
论坛格式似乎从您的代码片段中删除了一些元素。
但是,我认为您的 WHILE 循环正在处理一组可能的查询状态,这在以前不适合 QUEUED
。
如果是这种情况,那么是的,将 QUEUED
添加到该数组将允许您的应用程序处理新状态。
我已经在 lambda 函数内部通过 athena 成功查询 s3 很长一段时间了,但它突然停止工作了。进一步调查显示来自 get_query_execution() 的响应返回 'QUEUED' 状态(我被引导相信没有使用?!)
我的代码如下:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED':
return False
time.sleep(10)
return False
所以它显然在正常工作 - 只是 'QUEUED' 状态完全出乎意料,我不知道该怎么办?是什么导致 query_execution 变成 'QUEUED' 以及我的代码需要更改什么以适应它?
查看 Apache Airflow 中的 Athena hook。 Athena 有最终状态(SUCCEEDED、FAILED 和 CANCELLED)和中间状态 - 运行 和 QUEUED。 QUEUED 是查询开始之前的正常状态。所以你可以使用这样的代码:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'QUEUED'
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED' or state == 'CANCELLED':
return False
time.sleep(10)
return False
从 AWS 得到了这个响应 - Athena 的变化导致了这个问题(尽管 QUEUED
已经在状态枚举中有一段时间了,直到现在才被使用):
Athena 团队最近为 Athena 部署了一系列新功能,包括用于 Athena 查询的更精细的 CloudWatch 指标。
更多信息:
AWS What's New page
雅典娜文档 CloudWatch metrics
作为更精细指标部署的一部分,Athena 现在包含查询的 QUEUED
状态。此状态表示 Athena 查询正在等待分配资源进行处理。查询流程大致为:
SUBMITTED -> QUEUED -> RUNNING -> COMPLETED/FAILED
请注意,由于系统错误而失败的查询可以放回队列中并重试。
对于此更改造成的挫败感,我深表歉意。
论坛格式似乎从您的代码片段中删除了一些元素。
但是,我认为您的 WHILE 循环正在处理一组可能的查询状态,这在以前不适合 QUEUED
。
如果是这种情况,那么是的,将 QUEUED
添加到该数组将允许您的应用程序处理新状态。