如何使用 python boto3 向本地 SQS 队列发送消息?
How to send a message to local SQS queue using python boto3?
我正在尝试使用 https://github.com/roribio/alpine-sqs 容器模拟 AWS SQS 功能。
我能够 运行 docker 容器并使用终端将消息发送到队列。使用 aws configure
配置 AWS 访问密钥 ID 和 AWS 秘密访问密钥 为空字符串
我用来向SQS队列容器发送消息的命令是这个
aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/default --message-body "Hello, queue!"
我能够收到消息,并且可以在 localhost:9235
的浏览器仪表板中看到它。
但是当我尝试在 python
中使用 boto3
发送消息时,它抛出了一个错误。
Traceback (most recent call last): File
"/home/infomagnus/PycharmProjects/InfoMagnus/workload/app/workload/services/queue_services.py",
line 13, in
'Information about current NY Times fiction bestseller for ' File
"/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py",
line 357, in _api_call
return self._make_api_call(operation_name, kwargs) File "/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py",
line 661, in _make_api_call
raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred
(InvalidClientTokenId) when calling the SendMessage operation: The
security token included in the request is invalid
不知道为什么我在使用 aws configure
设置密钥后仍会收到错误消息。
我运行宁的一切都在我的本地。
这是我的代码:
import boto3
sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None)
queue_url = 'http://localhost:9324/queue/default'
resp = sqs.send_message(
QueueUrl=queue_url,
MessageBody=(
'Sample message for Queue.'
)
)
print(resp['MessageId'])
您忘记传递 endpoint_url
。
sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None, endpoint_url='http://localhost:9324')
灵感来自 this example here。查看此脚本以上传 N=10 大小的小批量:
import json
import boto3
from tqdm import tqdm
from more_itertools import ichunked
QUEUE_URL = "<your_queue_url>"
client = boto3.client('sqs')
def send_messages(messages):
entries = [{
'Id': str(ind),
'MessageBody': json.dumps({
# Your body goes here
})
} for ind, obj in enumerate(messages)]
response = client.send_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
if 'Failed' in response:
raise Exception(f"Error in messages: {response}")
return response
def enqueue_messages(batch_size=10):
for chunk in tqdm(ichunked(your_generator(), batch_size)):
send_messages(chunk)
if __name__ == "__main__":
enqueue_messages()
我正在尝试使用 https://github.com/roribio/alpine-sqs 容器模拟 AWS SQS 功能。
我能够 运行 docker 容器并使用终端将消息发送到队列。使用 aws configure
我用来向SQS队列容器发送消息的命令是这个
aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/default --message-body "Hello, queue!"
我能够收到消息,并且可以在 localhost:9235
的浏览器仪表板中看到它。
但是当我尝试在 python
中使用 boto3
发送消息时,它抛出了一个错误。
Traceback (most recent call last): File "/home/infomagnus/PycharmProjects/InfoMagnus/workload/app/workload/services/queue_services.py", line 13, in 'Information about current NY Times fiction bestseller for ' File "/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call return self._make_api_call(operation_name, kwargs) File "/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py", line 661, in _make_api_call raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the SendMessage operation: The security token included in the request is invalid
不知道为什么我在使用 aws configure
设置密钥后仍会收到错误消息。
我运行宁的一切都在我的本地。
这是我的代码:
import boto3
sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None)
queue_url = 'http://localhost:9324/queue/default'
resp = sqs.send_message(
QueueUrl=queue_url,
MessageBody=(
'Sample message for Queue.'
)
)
print(resp['MessageId'])
您忘记传递 endpoint_url
。
sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None, endpoint_url='http://localhost:9324')
灵感来自 this example here。查看此脚本以上传 N=10 大小的小批量:
import json
import boto3
from tqdm import tqdm
from more_itertools import ichunked
QUEUE_URL = "<your_queue_url>"
client = boto3.client('sqs')
def send_messages(messages):
entries = [{
'Id': str(ind),
'MessageBody': json.dumps({
# Your body goes here
})
} for ind, obj in enumerate(messages)]
response = client.send_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
if 'Failed' in response:
raise Exception(f"Error in messages: {response}")
return response
def enqueue_messages(batch_size=10):
for chunk in tqdm(ichunked(your_generator(), batch_size)):
send_messages(chunk)
if __name__ == "__main__":
enqueue_messages()