Pytest with Moto,使用后端更改雅典娜查询的状态
Pytest with Moto, change the status of an athena query using the backend
我正在使用 moto 测试代码库中的 aws 功能。我遇到的问题之一 运行 是在测试 athena 时,查询状态无限期地停留在“QUEUED”,导致测试失败或超时。
这里是要测试的方法:
import time
import boto3
class Athena:
CLIENT = boto3.client("athena")
class QueryError(Exception):
"""A class for exceptions related to queries."""
@classmethod
def execute_query(cls, query, result_location, check_status=True,
time_limit=10):
"""
Execute a query in Athena.
"""
_result_configuration = {"OutputLocation": result_location}
_kwargs = {"QueryString": query, "ResultConfiguration":
_result_configuration}
response = cls.CLIENT.start_query_execution(**_kwargs)
query_id = response["QueryExecutionId"]
if check_status:
old_time = time.time()
while True:
status = cls.CLIENT.get_query_execution(
QueryExecutionId=query_id)
status = status["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
if status == "FAILED":
raise cls.QueryError("error")
break
time.sleep(0.2) # 200ms
if time.time() - old_time > time_limit and status
== "QUEUED":
raise cls.QueryError("time limit reached")
return query_id
这里是通过测试的fixture
from moto.s3 import mock_s3
import boto3
@pytest.fixture
def s3():
with mock_s3():
s3 = boto3.client("s3")
yield s3
这里是测试(记住你需要用上面的方法把from x
改成模块)
import uuid
import boto3
import pytest
from moto.athena import mock_athena
from moto.s3 import mock_s3
@mock_s3
@mock_athena
def test_execute_query_check(s3):
from x import Athena
"""
Test for 'execute_query' (with status check)
"""
CLIENT = s3
bucket_name = "pytest." + str(uuid.uuid4())
# Bucket creation
bucket_config = {"LocationConstraint": "us-east-2"}
CLIENT.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=bucket_config)
waiter = CLIENT.get_waiter("bucket_exists")
waiter.wait(Bucket=bucket_name)
s3_location = f"s3://{bucket_name}/"
query = "SELECT current_date, current_time;"
query_id = Athena.execute_query(query, s3_location,
check_status=True)
assert query_id
此测试失败,因为 moto
未更改 "QUEUED"
之后的查询状态,并且测试期望更改为状态,否则会触发异常。
我希望能够做类似的事情:
from moto.athena import athena_backends
athena_backends['us-east-2'].job_flows[query_id].state = "SUCCEEDED"
如本期所建议:https://github.com/spulec/moto/issues/380
然而,boto3 mapreduce 后端似乎不再存在“工作流程”属性,我找不到明确更改它的方法。
理想情况下,这将能够在测试的某个地方发生,以手动更改查询的状态以模拟实际资源的情况。
在我看来moto
只有returns QUEUED
为start_query_execution
,你可以看看源码here。
另一种方法是使用 from unittest import mock
,然后您可以执行以下操作:
cls.CLIENT = mock.Mock()
cls.CLIENT.start_query_execution.side_effect = [
'QUEUED',
'SUCCEEDED'
]
那么,第一次调用cls.CLIENT.start_query_execution(..)
,会return查询排队,第二次return会成功,然后就是能够测试两个路径执行。
而且,对于moto
,它无法测试所有情况,因为除了排队状态,您只能将查询状态设置为CANCELLED
,如您所见here.
状态可以访问和改变如下:
athena_backends['us-east-2'].executions.get(query_id).status
示例代码片段
from moto.athena import athena_backends
query = "SELECT stuff"
location = "s3://bucket-name/prefix/"
database = "database"
# Start Query
exex_id = self.client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={"OutputLocation": location},
)["QueryExecutionId"]
athena_backends['us-west-2'].executions.get(exex_id).status = "CANCELLED"
我正在使用 moto 测试代码库中的 aws 功能。我遇到的问题之一 运行 是在测试 athena 时,查询状态无限期地停留在“QUEUED”,导致测试失败或超时。
这里是要测试的方法:
import time
import boto3
class Athena:
CLIENT = boto3.client("athena")
class QueryError(Exception):
"""A class for exceptions related to queries."""
@classmethod
def execute_query(cls, query, result_location, check_status=True,
time_limit=10):
"""
Execute a query in Athena.
"""
_result_configuration = {"OutputLocation": result_location}
_kwargs = {"QueryString": query, "ResultConfiguration":
_result_configuration}
response = cls.CLIENT.start_query_execution(**_kwargs)
query_id = response["QueryExecutionId"]
if check_status:
old_time = time.time()
while True:
status = cls.CLIENT.get_query_execution(
QueryExecutionId=query_id)
status = status["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
if status == "FAILED":
raise cls.QueryError("error")
break
time.sleep(0.2) # 200ms
if time.time() - old_time > time_limit and status
== "QUEUED":
raise cls.QueryError("time limit reached")
return query_id
这里是通过测试的fixture
from moto.s3 import mock_s3
import boto3
@pytest.fixture
def s3():
with mock_s3():
s3 = boto3.client("s3")
yield s3
这里是测试(记住你需要用上面的方法把from x
改成模块)
import uuid
import boto3
import pytest
from moto.athena import mock_athena
from moto.s3 import mock_s3
@mock_s3
@mock_athena
def test_execute_query_check(s3):
from x import Athena
"""
Test for 'execute_query' (with status check)
"""
CLIENT = s3
bucket_name = "pytest." + str(uuid.uuid4())
# Bucket creation
bucket_config = {"LocationConstraint": "us-east-2"}
CLIENT.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=bucket_config)
waiter = CLIENT.get_waiter("bucket_exists")
waiter.wait(Bucket=bucket_name)
s3_location = f"s3://{bucket_name}/"
query = "SELECT current_date, current_time;"
query_id = Athena.execute_query(query, s3_location,
check_status=True)
assert query_id
此测试失败,因为 moto
未更改 "QUEUED"
之后的查询状态,并且测试期望更改为状态,否则会触发异常。
我希望能够做类似的事情:
from moto.athena import athena_backends
athena_backends['us-east-2'].job_flows[query_id].state = "SUCCEEDED"
如本期所建议:https://github.com/spulec/moto/issues/380
然而,boto3 mapreduce 后端似乎不再存在“工作流程”属性,我找不到明确更改它的方法。 理想情况下,这将能够在测试的某个地方发生,以手动更改查询的状态以模拟实际资源的情况。
在我看来moto
只有returns QUEUED
为start_query_execution
,你可以看看源码here。
另一种方法是使用 from unittest import mock
,然后您可以执行以下操作:
cls.CLIENT = mock.Mock()
cls.CLIENT.start_query_execution.side_effect = [
'QUEUED',
'SUCCEEDED'
]
那么,第一次调用cls.CLIENT.start_query_execution(..)
,会return查询排队,第二次return会成功,然后就是能够测试两个路径执行。
而且,对于moto
,它无法测试所有情况,因为除了排队状态,您只能将查询状态设置为CANCELLED
,如您所见here.
状态可以访问和改变如下:
athena_backends['us-east-2'].executions.get(query_id).status
示例代码片段
from moto.athena import athena_backends
query = "SELECT stuff"
location = "s3://bucket-name/prefix/"
database = "database"
# Start Query
exex_id = self.client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={"OutputLocation": location},
)["QueryExecutionId"]
athena_backends['us-west-2'].executions.get(exex_id).status = "CANCELLED"