如何使用 python boto3 向本地 SQS 队列发送消息?

How to send a message to local SQS queue using python boto3?

我正在尝试使用 https://github.com/roribio/alpine-sqs 容器模拟 AWS SQS 功能。

我能够 运行 docker 容器并使用终端将消息发送到队列。使用 aws configure

配置 AWS 访问密钥 IDAWS 秘密访问密钥 为空字符串

我用来向SQS队列容器发送消息的命令是这个

aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/default --message-body "Hello, queue!"

我能够收到消息,并且可以在 localhost:9235 的浏览器仪表板中看到它。

但是当我尝试在 python 中使用 boto3 发送消息时,它抛出了一个错误。

Traceback (most recent call last): File "/home/infomagnus/PycharmProjects/InfoMagnus/workload/app/workload/services/queue_services.py", line 13, in 'Information about current NY Times fiction bestseller for ' File "/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call return self._make_api_call(operation_name, kwargs) File "/home/infomagnus/envs/DSDPenv/lib/python3.7/site-packages/botocore/client.py", line 661, in _make_api_call raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the SendMessage operation: The security token included in the request is invalid

不知道为什么我在使用 aws configure 设置密钥后仍会收到错误消息。

我运行宁的一切都在我的本地。

这是我的代码:

import boto3
sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None)
queue_url = 'http://localhost:9324/queue/default'
resp = sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=(
        'Sample message for Queue.'
    )
)
print(resp['MessageId'])

您忘记传递 endpoint_url

sqs = boto3.client('sqs', aws_access_key_id=None, aws_secret_access_key=None, endpoint_url='http://localhost:9324')

灵感来自 this example here。查看此脚本以上传 N=10 大小的小批量:

import json
import boto3
from tqdm import tqdm
from more_itertools import ichunked

QUEUE_URL = "<your_queue_url>"
client = boto3.client('sqs')

def send_messages(messages):
    entries = [{
        'Id': str(ind),
        'MessageBody': json.dumps({
            # Your body goes here
        })
    } for ind, obj in enumerate(messages)]

    response = client.send_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
    if 'Failed' in response:
        raise Exception(f"Error in messages: {response}")
    return response

def enqueue_messages(batch_size=10):
    for chunk in tqdm(ichunked(your_generator(), batch_size)):
        send_messages(chunk)

if __name__ == "__main__":
    enqueue_messages()