Python AWS SQS 模拟 MOTO
Python AWS SQS mocking with MOTO
我正在尝试用 moto 模拟 AWS SQS,下面是我的代码
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None
执行此操作时出现以下错误
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'
如果我尝试另一种在 SQS 中发送消息的方式(请参阅注释掉的代码#Tried this as well)
然后在我的方法 get_msg_from_sqs 中实际调用 SQS 时,出现以下错误
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.
我是 运行 它在 win10 上 PyCharm 并且 moto 版本设置为
moto = "^2.2.6"
我的代码如下
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
我在这里错过了什么?
根据@gshpychka,您需要查看 create_queue
的工作原理。具体来说,它 returns 这种形式的字典:
Response Structure
(dict) --
QueueUrl (string) --
已创建的 Amazon SQS 队列 URL。
所以使用这个 api 你可以:
import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)
我同意文档令人困惑。造成混乱的原因可能是 sqs resource api,它的工作方式不同:
import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)
这是可行的,因为 this api returns 一个 Queue
对象,这可能是您所期望的。
请注意@gshpychka 已经在评论中给出了答案;我刚写出来。
您的 queue
变量是 create_queue:
返回的字典
queue = conn.create_queue(QueueName='Test')
它不是队列,因此您不能对其调用 sendMessage
。
为此,您需要创建一个队列对象:
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)
queue.send_message()
我正在尝试用 moto 模拟 AWS SQS,下面是我的代码
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None
执行此操作时出现以下错误
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'
如果我尝试另一种在 SQS 中发送消息的方式(请参阅注释掉的代码#Tried this as well) 然后在我的方法 get_msg_from_sqs 中实际调用 SQS 时,出现以下错误
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.
我是 运行 它在 win10 上 PyCharm 并且 moto 版本设置为
moto = "^2.2.6"
我的代码如下
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
我在这里错过了什么?
根据@gshpychka,您需要查看 create_queue
的工作原理。具体来说,它 returns 这种形式的字典:
Response Structure (dict) --
QueueUrl (string) --
已创建的 Amazon SQS 队列 URL。
所以使用这个 api 你可以:
import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)
我同意文档令人困惑。造成混乱的原因可能是 sqs resource api,它的工作方式不同:
import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)
这是可行的,因为 this api returns 一个 Queue
对象,这可能是您所期望的。
请注意@gshpychka 已经在评论中给出了答案;我刚写出来。
您的 queue
变量是 create_queue:
queue = conn.create_queue(QueueName='Test')
它不是队列,因此您不能对其调用 sendMessage
。
为此,您需要创建一个队列对象:
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)
queue.send_message()