如何使用 aiobotocore 模拟 AWS S3
How to mock AWS S3 with aiobotocore
我有一个项目使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源。我正在尝试测试与 AWS S3 一起使用的 class,并且我正在使用 moto 来模拟 AWS。模拟适用于使用同步代码的示例(来自 moto 文档的示例)
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
但是,在重写它以使用 aiobotocore 模拟之后不起作用 - 在我的示例中它连接到真实的 AWS S3。
import aiobotocore
import asyncio
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()
model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
所以我在这里的假设是 moto 不能与 aiobotocore 一起正常工作。如果我的源代码类似于第二个示例,我如何才能有效地模拟 AWS 资源?
来自 moto
的模拟不起作用,因为它们使用同步 API。但是,您可以启动 moto
服务器并配置 aiobotocore
以连接到此测试服务器。
Take a look on aiobotocore tests 寻找灵感。
这是来自 aiobotocore 的 mock_server.py,没有 pytest:
# Initially from https://raw.githubusercontent.com/aio-libs/aiobotocore/master/tests/mock_server.py
import shutil
import signal
import subprocess as sp
import sys
import time
import requests
_proxy_bypass = {
"http": None,
"https": None,
}
def start_service(service_name, host, port):
moto_svr_path = shutil.which("moto_server")
args = [sys.executable, moto_svr_path, service_name, "-H", host,
"-p", str(port)]
process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL)
url = "http://{host}:{port}".format(host=host, port=port)
for _ in range(30):
if process.poll() is not None:
break
try:
# we need to bypass the proxies due to monkeypatches
requests.get(url, timeout=0.1, proxies=_proxy_bypass)
break
except requests.exceptions.RequestException:
time.sleep(0.1)
else:
stop_process(process)
raise AssertionError("Can not start service: {}".format(service_name))
return process
def stop_process(process, timeout=20):
try:
process.send_signal(signal.SIGTERM)
process.communicate(timeout=timeout / 2)
except sp.TimeoutExpired:
process.kill()
outs, errors = process.communicate(timeout=timeout / 2)
exit_code = process.returncode
msg = "Child process finished {} not in clean way: {} {}" \
.format(exit_code, outs, errors)
raise RuntimeError(msg)
使用 AWS 的 stubber 应该可以解决问题。以下是我在 tornado 应用程序中执行 aws 读取操作的方法:
import aiobotocore
from botocore.stub import Stubber
from tornado.testing import AsyncTestCase
from aiobotocore.response import StreamingBody
class RawStream(io.BytesIO):
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def read(self, n):
return super().read(n)
class S3TestCase(AsyncTestCase):
def setUp(self):
super().setUp()
session = aiobotocore.get_session()
self.client = session.create_client("s3", region_name="AWS_S3_REGION",
aws_secret_access_key="AWS_SECRET_ACCESS_KEY",
aws_access_key_id="AWS_ACCESS_KEY_ID")
@tornado.testing.gen_test
def test_read(self):
stubber = Stubber(self.client)
stubber.add_response("get_object",
{"Body": StreamingBody(raw_stream=RawStream(self.binary_content), content_length=128),
"ContentLength": 128},
expected_params={"Bucket": "AWS_S3_BUCKET",
"Key": "filename"})
stubber.activate()
response = await client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")
写入操作应该类似。希望这会引导您朝着正确的方向前进。
有关 stubber 的更多信息:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/stubber.html
我认为 Sebastian Brestins 的回答应该被接受。我要 post 这个新答案,因为自 posted 以来有些事情发生了变化,例如python 3.8 现在支持异步测试用例,aioboto3 客户端现在是上下文管理器。
使用 python 3.8 的最小示例如下所示:
from unittest import IsolatedAsyncioTestCase
import aioboto3
from botocore.stub import Stubber
class Test(IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self._s3_client = await aioboto3.client('s3').__aenter__()
self._s3_stub = Stubber(self._s3_client)
async def asyncTearDown(self):
await self._s3_client.__aexit__(None, None, None)
async def test_case(self):
self._s3_stub.add_response(
"get_object",
{"Body": "content"},
expected_params={"Bucket": "AWS_S3_BUCKET", "Key": "filename"}
)
self._s3_stub.activate()
response = await self._s3_client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")
self.assertEquals(response, "content")
我们可以使用 moto[server] 创建 S3 服务器,然后从中创建一个类似于 aioboto3
的 pytest fixture
@pytest.yield_fixture(scope='session')
def s3_server():
host = 'localhost'
port = 5002
url = 'http://{host}:{port}'.format(host=host, port=port)
process = start_service('s3', host, port)
yield url
stop_process(process)
然后 patch('aiobotocore.AioSession.create_client')
return_value 和 aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server)
如下
async with aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) as client:
with patch('aiobotocore.AioSession.create_client') as mock:
mock.return_value = client
# Test your code
不幸的是,这不是一个完整的答案,但有一个添加此功能的拉取请求已经开放了 6 个月:https://github.com/aio-libs/aiobotocore/pull/766
当我处理 asyncio 的类似问题时,我为同步对象手动编写了“异步”包装器。
我有一个项目使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源。我正在尝试测试与 AWS S3 一起使用的 class,并且我正在使用 moto 来模拟 AWS。模拟适用于使用同步代码的示例(来自 moto 文档的示例)
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
但是,在重写它以使用 aiobotocore 模拟之后不起作用 - 在我的示例中它连接到真实的 AWS S3。
import aiobotocore
import asyncio
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()
model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
所以我在这里的假设是 moto 不能与 aiobotocore 一起正常工作。如果我的源代码类似于第二个示例,我如何才能有效地模拟 AWS 资源?
来自 moto
的模拟不起作用,因为它们使用同步 API。但是,您可以启动 moto
服务器并配置 aiobotocore
以连接到此测试服务器。
Take a look on aiobotocore tests 寻找灵感。
这是来自 aiobotocore 的 mock_server.py,没有 pytest:
# Initially from https://raw.githubusercontent.com/aio-libs/aiobotocore/master/tests/mock_server.py
import shutil
import signal
import subprocess as sp
import sys
import time
import requests
_proxy_bypass = {
"http": None,
"https": None,
}
def start_service(service_name, host, port):
moto_svr_path = shutil.which("moto_server")
args = [sys.executable, moto_svr_path, service_name, "-H", host,
"-p", str(port)]
process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL)
url = "http://{host}:{port}".format(host=host, port=port)
for _ in range(30):
if process.poll() is not None:
break
try:
# we need to bypass the proxies due to monkeypatches
requests.get(url, timeout=0.1, proxies=_proxy_bypass)
break
except requests.exceptions.RequestException:
time.sleep(0.1)
else:
stop_process(process)
raise AssertionError("Can not start service: {}".format(service_name))
return process
def stop_process(process, timeout=20):
try:
process.send_signal(signal.SIGTERM)
process.communicate(timeout=timeout / 2)
except sp.TimeoutExpired:
process.kill()
outs, errors = process.communicate(timeout=timeout / 2)
exit_code = process.returncode
msg = "Child process finished {} not in clean way: {} {}" \
.format(exit_code, outs, errors)
raise RuntimeError(msg)
使用 AWS 的 stubber 应该可以解决问题。以下是我在 tornado 应用程序中执行 aws 读取操作的方法:
import aiobotocore
from botocore.stub import Stubber
from tornado.testing import AsyncTestCase
from aiobotocore.response import StreamingBody
class RawStream(io.BytesIO):
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def read(self, n):
return super().read(n)
class S3TestCase(AsyncTestCase):
def setUp(self):
super().setUp()
session = aiobotocore.get_session()
self.client = session.create_client("s3", region_name="AWS_S3_REGION",
aws_secret_access_key="AWS_SECRET_ACCESS_KEY",
aws_access_key_id="AWS_ACCESS_KEY_ID")
@tornado.testing.gen_test
def test_read(self):
stubber = Stubber(self.client)
stubber.add_response("get_object",
{"Body": StreamingBody(raw_stream=RawStream(self.binary_content), content_length=128),
"ContentLength": 128},
expected_params={"Bucket": "AWS_S3_BUCKET",
"Key": "filename"})
stubber.activate()
response = await client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")
写入操作应该类似。希望这会引导您朝着正确的方向前进。
有关 stubber 的更多信息:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/stubber.html
我认为 Sebastian Brestins 的回答应该被接受。我要 post 这个新答案,因为自 posted 以来有些事情发生了变化,例如python 3.8 现在支持异步测试用例,aioboto3 客户端现在是上下文管理器。
使用 python 3.8 的最小示例如下所示:
from unittest import IsolatedAsyncioTestCase
import aioboto3
from botocore.stub import Stubber
class Test(IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self._s3_client = await aioboto3.client('s3').__aenter__()
self._s3_stub = Stubber(self._s3_client)
async def asyncTearDown(self):
await self._s3_client.__aexit__(None, None, None)
async def test_case(self):
self._s3_stub.add_response(
"get_object",
{"Body": "content"},
expected_params={"Bucket": "AWS_S3_BUCKET", "Key": "filename"}
)
self._s3_stub.activate()
response = await self._s3_client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")
self.assertEquals(response, "content")
我们可以使用 moto[server] 创建 S3 服务器,然后从中创建一个类似于 aioboto3
的 pytest fixture@pytest.yield_fixture(scope='session')
def s3_server():
host = 'localhost'
port = 5002
url = 'http://{host}:{port}'.format(host=host, port=port)
process = start_service('s3', host, port)
yield url
stop_process(process)
然后 patch('aiobotocore.AioSession.create_client')
return_value 和 aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server)
如下
async with aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) as client:
with patch('aiobotocore.AioSession.create_client') as mock:
mock.return_value = client
# Test your code
不幸的是,这不是一个完整的答案,但有一个添加此功能的拉取请求已经开放了 6 个月:https://github.com/aio-libs/aiobotocore/pull/766
当我处理 asyncio 的类似问题时,我为同步对象手动编写了“异步”包装器。