从 Notebook 实例查询 Athena 中的 table/database

Query a table/database in Athena from a Notebook instance

我为不同的团队开发了不同的 Athena 工作组,这样我就可以将他们的查询和他们的查询结果分开。用户想从他们的笔记本实例 (JupyterLab) 中查询可用的 tables。我很难找到成功满足从用户特定工作组查询 table 的要求的代码。我只找到了将从主要工作组查询 table 的代码。

下面添加我目前使用的代码。

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
region_name='<YOUR REGION, for example, us-west-2>')


df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
df

此代码不起作用,因为用户只能访问其特定工作组的查询,因此当此代码为 运行 时会出错。它也不包括在用户特定工作组中分离用户查询的要求。

关于如何添加更改代码以便我可以从笔记本实例运行特定工作组内的查询的任何建议?

pyathena 的文档不是非常广泛,但在查看源代码后我们可以看到 connect 只是创建了 Connection class.[=26 的实例=]

def connect(*args, **kwargs):
    from pyathena.connection import Connection
    return Connection(*args, **kwargs)

现在,在 GitHub we can see parameter work_group=None which name in the same way as one of the parameters for start_query_execution from the official AWS Python API boto3 上查看 Connection.__init__ 的签名后。以下是他们的文档对此的描述:

WorkGroup (string) -- The name of the workgroup in which the query is being started.

在遵循 Connection 中的用法和导入之后,我们最终得到 BaseCursor class,在解包参数组装的字典时,在引擎盖下调用 start_query_execution通过 BaseCursor._build_start_query_execution_request 方法。这正是我们可以看到向 ​​AWS Athena 提交查询的熟悉语法的地方,特别是以下部分:

if self._work_group or work_group:
    request.update({
        'WorkGroup': work_group if work_group else self._work_group
    })

所以这应该对你的情况有用:

import pandas as pd
from pyathena import connect


conn = connect(
    s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
    region_name='<YOUR REGION, for example, us-west-2>',
    work_group='<USER SPECIFIC WORKGROUP>'
)

df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)