使用 moto + serverless 模拟 DynamoDB
Mocking DynamoDB using moto + serverless
我正在尝试使用 AWS 无服务器框架为无服务器应用程序编写测试。我面临一个奇怪的问题。每当我尝试使用 moto 模拟 S3 或 DynamoDB 时,它都不起作用。 boto3 调用实际上不是模拟,而是转到我的 AWS 帐户并尝试在那里做事。
这是不可取的行为。你能帮忙吗?
示例代码:
import datetime
import boto3
import uuid
import os
from moto import mock_dynamodb2
from unittest import mock, TestCase
from JobEngine.job_engine import check_duplicate
class TestJobEngine(TestCase):
@mock.patch.dict(os.environ, {'IN_QUEUE_URL': 'mytemp'})
@mock.patch('JobEngine.job_engine.logger')
@mock_dynamodb2
def test_check_duplicate(self, mock_logger):
id = 'ABCD123'
db = boto3.resource('dynamodb', 'us-east-1')
table = db.create_table(
TableName='my_table',
KeySchema=[
{
'AttributeName': 'id',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)
table.meta.client.get_waiter('table_exists').wait(TableName='my_table')
table.put_item(
Item={
'id': {'S': id},
... other data ...
}
)
res = check_duplicate(id)
self.assertTrue(mock_logger.info.called)
self.assertEqual(res, True, 'True')
请看上面的代码,我试图在 table 中插入一条记录,然后调用一个函数来验证指定的 ID 是否已经存在于 table 中。在这里,当我 运行 this code.
时,我得到一个错误 table already exists
如果我禁用网络,我会得到一个错误:
botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "https://dynamodb.us-east-1.amazonaws.com/"
我不明白为什么我们尝试模拟时会尝试连接到 AWS。
我做了一些挖掘,终于设法解决了这个问题。
见https://github.com/spulec/moto/issues/1793
这个问题是由于 boto 和 moto 之间的一些不兼容造成的。当我们将 botocore 降级到 1.10.84
时一切正常
我正在尝试使用 AWS 无服务器框架为无服务器应用程序编写测试。我面临一个奇怪的问题。每当我尝试使用 moto 模拟 S3 或 DynamoDB 时,它都不起作用。 boto3 调用实际上不是模拟,而是转到我的 AWS 帐户并尝试在那里做事。
这是不可取的行为。你能帮忙吗?
示例代码:
import datetime
import boto3
import uuid
import os
from moto import mock_dynamodb2
from unittest import mock, TestCase
from JobEngine.job_engine import check_duplicate
class TestJobEngine(TestCase):
@mock.patch.dict(os.environ, {'IN_QUEUE_URL': 'mytemp'})
@mock.patch('JobEngine.job_engine.logger')
@mock_dynamodb2
def test_check_duplicate(self, mock_logger):
id = 'ABCD123'
db = boto3.resource('dynamodb', 'us-east-1')
table = db.create_table(
TableName='my_table',
KeySchema=[
{
'AttributeName': 'id',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)
table.meta.client.get_waiter('table_exists').wait(TableName='my_table')
table.put_item(
Item={
'id': {'S': id},
... other data ...
}
)
res = check_duplicate(id)
self.assertTrue(mock_logger.info.called)
self.assertEqual(res, True, 'True')
请看上面的代码,我试图在 table 中插入一条记录,然后调用一个函数来验证指定的 ID 是否已经存在于 table 中。在这里,当我 运行 this code.
时,我得到一个错误 table already exists如果我禁用网络,我会得到一个错误:
botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "https://dynamodb.us-east-1.amazonaws.com/"
我不明白为什么我们尝试模拟时会尝试连接到 AWS。
我做了一些挖掘,终于设法解决了这个问题。
见https://github.com/spulec/moto/issues/1793
这个问题是由于 boto 和 moto 之间的一些不兼容造成的。当我们将 botocore 降级到 1.10.84
时一切正常