从 Lambda 函数查询 Athena - QUEUED 状态?

Querying Athena from Lambda function - QUEUED state?

我已经在 lambda 函数内部通过 athena 成功查询 s3 很长一段时间了,但它突然停止工作了。进一步调查显示来自 get_query_execution() 的响应返回 'QUEUED' 状态(我被引导相信没有使用?!)

我的代码如下:

def run_query(query, database, s3_output, max_execution=5):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output
    })

    execution_id = response['QueryExecutionId']
    print("QueryExecutionId = " + str(execution_id))
    state  = 'RUNNING'

    while (max_execution > 0 and state in ['RUNNING']):
        max_execution = max_execution - 1
        print("maxexecution=" + str(max_execution))
        response = client.get_query_execution(QueryExecutionId = execution_id)    

        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:

                state = response['QueryExecution']['Status']['State']
                print(state)
                if state == 'SUCCEEDED':
                    print("Query SUCCEEDED: {}".format(execution_id))

                    s3_key = 'athena_output/' + execution_id + '.csv'
                    print(s3_key)
                    local_filename = '/tmp/' + execution_id + '.csv'
                    print(local_filename)

                    rows = []
                    try:
                        print("s3key =" + s3_key)
                        print("localfilename = " + local_filename)
                        s3.Bucket(BUCKET).download_file(s3_key, local_filename)
                        with open(local_filename) as csvfile:
                            reader = csv.DictReader(csvfile)
                            for row in reader:
                                rows.append(row)
                    except botocore.exceptions.ClientError as e:
                        if e.response['Error']['Code'] == "404":
                            print("The object does not exist.")
                            print(e)
                        else:
                            raise
                    return json.dumps(rows)
                elif state == 'FAILED':
                    return False
        time.sleep(10)
    return False

所以它显然在正常工作 - 只是 'QUEUED' 状态完全出乎意料,我不知道该怎么办?是什么导致 query_execution 变成 'QUEUED' 以及我的代码需要更改什么以适应它?

查看 Apache Airflow 中的 Athena hook。 Athena 有最终状态(SUCCEEDED、FAILED 和 CANCELLED)和中间状态 - 运行 和 QUEUED。 QUEUED 是查询开始之前的正常状态。所以你可以使用这样的代码:

def run_query(query, database, s3_output, max_execution=5):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output
    })

    execution_id = response['QueryExecutionId']
    print("QueryExecutionId = " + str(execution_id))
    state  = 'QUEUED'

    while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
        max_execution = max_execution - 1
        print("maxexecution=" + str(max_execution))
        response = client.get_query_execution(QueryExecutionId = execution_id)    

        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:

                state = response['QueryExecution']['Status']['State']
                print(state)
                if state == 'SUCCEEDED':
                    print("Query SUCCEEDED: {}".format(execution_id))

                    s3_key = 'athena_output/' + execution_id + '.csv'
                    print(s3_key)
                    local_filename = '/tmp/' + execution_id + '.csv'
                    print(local_filename)

                    rows = []
                    try:
                        print("s3key =" + s3_key)
                        print("localfilename = " + local_filename)
                        s3.Bucket(BUCKET).download_file(s3_key, local_filename)
                        with open(local_filename) as csvfile:
                            reader = csv.DictReader(csvfile)
                            for row in reader:
                                rows.append(row)
                    except botocore.exceptions.ClientError as e:
                        if e.response['Error']['Code'] == "404":
                            print("The object does not exist.")
                            print(e)
                        else:
                            raise
                    return json.dumps(rows)
                elif state == 'FAILED' or state == 'CANCELLED':
                    return False
        time.sleep(10)
    return False

从 AWS 得到了这个响应 - Athena 的变化导致了这个问题(尽管 QUEUED 已经在状态枚举中有一段时间了,直到现在才被使用):

Athena 团队最近为 Athena 部署了一系列新功能,包括用于 Athena 查询的更精细的 CloudWatch 指标。

更多信息:

作为更精细指标部署的一部分,Athena 现在包含查询的 QUEUED 状态。此状态表示 Athena 查询正在等待分配资源进行处理。查询流程大致为:

SUBMITTED -> QUEUED -> RUNNING -> COMPLETED/FAILED

请注意,由于系统错误而失败的查询可以放回队列中并重试。

对于此更改造成的挫败感,我深表歉意。

论坛格式似乎从您的代码片段中删除了一些元素。 但是,我认为您的 WHILE 循环正在处理一组可能的查询状态,这在以前不适合 QUEUED。 如果是这种情况,那么是的,将 QUEUED 添加到该数组将允许您的应用程序处理新状态。