如何使用 boto3 和 moto 测试方法
how do I test methods using boto3 with moto
我正在编写测试用例以快速 class 使用 boto3 从 s3 查找/获取密钥。我过去曾使用 moto 来测试 boto(不是 3)代码,但我正在尝试将此项目转移到 boto3,并且 运行 遇到问题:
class TestS3Actor(unittest.TestCase):
@mock_s3
def setUp(self):
self.bucket_name = 'test_bucket_01'
self.key_name = 'stats_com/fake_fake/test.json'
self.key_contents = 'This is test data.'
s3 = boto3.session.Session().resource('s3')
s3.create_bucket(Bucket=self.bucket_name)
s3.Object(self.bucket_name, self.key_name).put(Body=self.key_contents)
错误:
...
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 344, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 314, in _raise_timeout
if 'timed out' in str(err) or 'did not complete (read)' in str(err): # Python 2.6
TypeError: __str__ returned non-string (type WantWriteError)
botocore.hooks: DEBUG: Event needs-retry.s3.CreateBucket: calling handler <botocore.retryhandler.RetryHandler object at 0x10ce75310>
moto 似乎没有正确模拟 boto3 调用 - 我该如何让它工作?
对我有用的是在 运行 使用 boto3
进行模拟测试之前使用 boto
设置环境。
这是一个工作片段:
import unittest
import boto
from boto.s3.key import Key
from moto import mock_s3
import boto3
class TestS3Actor(unittest.TestCase):
mock_s3 = mock_s3()
def setUp(self):
self.mock_s3.start()
self.location = "eu-west-1"
self.bucket_name = 'test_bucket_01'
self.key_name = 'stats_com/fake_fake/test.json'
self.key_contents = 'This is test data.'
s3 = boto.connect_s3()
bucket = s3.create_bucket(self.bucket_name, location=self.location)
k = Key(bucket)
k.key = self.key_name
k.set_contents_from_string(self.key_contents)
def tearDown(self):
self.mock_s3.stop()
def test_s3_boto3(self):
s3 = boto3.resource('s3', region_name=self.location)
bucket = s3.Bucket(self.bucket_name)
assert bucket.name == self.bucket_name
# retrieve already setup keys
keys = list(bucket.objects.filter(Prefix=self.key_name))
assert len(keys) == 1
assert keys[0].key == self.key_name
# update key
s3.Object(self.bucket_name, self.key_name).put(Body='new')
key = s3.Object(self.bucket_name, self.key_name).get()
assert 'new' == key['Body'].read()
当 运行 和 py.test test.py
时,您得到以下输出:
collected 1 items
test.py .
========================================================================================= 1 passed in 2.22 seconds =========================================================================================
根据this信息,貌似还不支持使用Boto3 S3 Put流式上传到s3。
就我而言,我使用以下方法成功将对象上传到存储桶:
s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("file_to_upload", 'rb'))
其中 "file_to_upload" 是您要上传到 s3 存储桶的本地文件。对于您的测试用例,您可以创建一个临时文件来检查此功能:
test_file = open("test_file.json", "w")
test_file.write("some test contents")
test_file.close()
s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("test_file", 'rb'))
我正在编写测试用例以快速 class 使用 boto3 从 s3 查找/获取密钥。我过去曾使用 moto 来测试 boto(不是 3)代码,但我正在尝试将此项目转移到 boto3,并且 运行 遇到问题:
class TestS3Actor(unittest.TestCase):
@mock_s3
def setUp(self):
self.bucket_name = 'test_bucket_01'
self.key_name = 'stats_com/fake_fake/test.json'
self.key_contents = 'This is test data.'
s3 = boto3.session.Session().resource('s3')
s3.create_bucket(Bucket=self.bucket_name)
s3.Object(self.bucket_name, self.key_name).put(Body=self.key_contents)
错误:
...
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 344, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 314, in _raise_timeout
if 'timed out' in str(err) or 'did not complete (read)' in str(err): # Python 2.6
TypeError: __str__ returned non-string (type WantWriteError)
botocore.hooks: DEBUG: Event needs-retry.s3.CreateBucket: calling handler <botocore.retryhandler.RetryHandler object at 0x10ce75310>
moto 似乎没有正确模拟 boto3 调用 - 我该如何让它工作?
对我有用的是在 运行 使用 boto3
进行模拟测试之前使用 boto
设置环境。
这是一个工作片段:
import unittest
import boto
from boto.s3.key import Key
from moto import mock_s3
import boto3
class TestS3Actor(unittest.TestCase):
mock_s3 = mock_s3()
def setUp(self):
self.mock_s3.start()
self.location = "eu-west-1"
self.bucket_name = 'test_bucket_01'
self.key_name = 'stats_com/fake_fake/test.json'
self.key_contents = 'This is test data.'
s3 = boto.connect_s3()
bucket = s3.create_bucket(self.bucket_name, location=self.location)
k = Key(bucket)
k.key = self.key_name
k.set_contents_from_string(self.key_contents)
def tearDown(self):
self.mock_s3.stop()
def test_s3_boto3(self):
s3 = boto3.resource('s3', region_name=self.location)
bucket = s3.Bucket(self.bucket_name)
assert bucket.name == self.bucket_name
# retrieve already setup keys
keys = list(bucket.objects.filter(Prefix=self.key_name))
assert len(keys) == 1
assert keys[0].key == self.key_name
# update key
s3.Object(self.bucket_name, self.key_name).put(Body='new')
key = s3.Object(self.bucket_name, self.key_name).get()
assert 'new' == key['Body'].read()
当 运行 和 py.test test.py
时,您得到以下输出:
collected 1 items
test.py .
========================================================================================= 1 passed in 2.22 seconds =========================================================================================
根据this信息,貌似还不支持使用Boto3 S3 Put流式上传到s3。
就我而言,我使用以下方法成功将对象上传到存储桶:
s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("file_to_upload", 'rb'))
其中 "file_to_upload" 是您要上传到 s3 存储桶的本地文件。对于您的测试用例,您可以创建一个临时文件来检查此功能:
test_file = open("test_file.json", "w")
test_file.write("some test contents")
test_file.close()
s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("test_file", 'rb'))