boto3: execute_command 在 python 脚本中

boto3: execute_command inside python script

我正在尝试 运行 向 fargate 管理的 ecs 容器发出命令。我可以建立连接并成功执行,但我无法从我的 python 脚本中的上述命令获得响应。

import boto3
import pprint as pp

client = boto3.client("ecs")
cluster = "my-mundane-cluster-name"


def main():
    task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
    for task_arn in task_arns.get("taskArns", []):
        cmd_out = client.execute_command(
            cluster=cluster,
            command="ls",
            interactive=True,
            task=task_arn,
        )
        pp.pprint(f"{cmd_out}")


if __name__ == "__main__":
    main()

我用 ls 替换了命令,但就所有意图和目的而言,流程是相同的。这是我得到的回复

{
    'clusterArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:cluster/■■■■■■',
    'containerArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:container/■■■■■■/■■■■■■/■■■■■■■■■■■■■■■■■■',
    'containerName': '■■■■■■',
    'interactive': True,
    'session': {
        'sessionId': 'ecs-execute-command-■■■■■■■■■',
        'streamUrl': '■■■■■■■■■■■■■■■■■■',
        'tokenValue': '■■■■■■■■■■■■■■■■■■'
    },
    'taskArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■■■■:task/■■■■■■■■■/■■■■■■■■■■■■■■■■■■',
    'ResponseMetadata': {
        'RequestId': '■■■■■■■■■■■■■■■■■■',
        'HTTPStatusCode': 200,
        'HTTPHeaders': {
            'x-amzn-requestid': '■■■■■■■■■■■■■■■■■■',
            'content-type': 'application/x-amz-json-1.1',
            'content-length': '■■■',
            'date': 'Thu, 29 Jul 2021 02:39:24 GMT'
        },
        'RetryAttempts': 0
    }
}

我已经尝试 运行 将命令设置为非交互式命令以查看它是否 returns 响应,但 sdk 显示 Interactive is the only mode supported currently。我也尝试过在线搜索有关如何执行此操作的线索,但没有成功。

非常感谢任何帮助。

一个快速的解决方案是使用 logging 而不是 pprint:

boto3.set_stream_logger('boto3.resources', logging.INFO) 

命令输出的值位于位于 streamId 的文档流中。您必须初始化一个新会话并将会话 ID 传递给它以检索它的内容。

粗略示例:

import boto3
import pprint as pp

client = boto3.client("ecs")
ssm_client = boto3.client("ssm")
cluster = "my-mundane-cluster-name"


def main():
    task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
    for task_arn in task_arns.get("taskArns", []):
        cmd_out = client.execute_command(
            cluster=cluster,
            command="ls",
            interactive=True,
            task=task_arn,
        )
        
        session_response = client.describe_sessions(
            State='Active'|'History',
            MaxResults=123,
            NextToken='string',
            Filters=[
                {
                    'key': 'InvokedAfter'|'InvokedBefore'|'Target'|'Owner'|'Status'|'SessionId',
                    'value': cmd_out["session"]["sessionId"]
                },
            ]
        )

        document_response = client.get_document(
            Name=session_response.sessions[0].document_name,
            DocumentFormat='YAML'|'JSON'|'TEXT'
        )

        pp.pprint(document_response)

参考资料

SSM:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html

SSM #get_document:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html#SSM.Client.get_document

我需要完成类似的任务,但据我所知,它并没有像这里回答的那样工作。让我知道这是否对您有用,如果有用,您是如何实施的。

对我来说,解决方案是打开会话中返回的 websocket 连接,然后读取输出。像这样:

import boto3
import json
import uuid
import construct as c
import websocket

def session_reader(session: dict) -> str:
    AgentMessageHeader = c.Struct(
        "HeaderLength" / c.Int32ub,
        "MessageType" / c.PaddedString(32, "ascii"),
    )

    AgentMessagePayload = c.Struct(
        "PayloadLength" / c.Int32ub,
        "Payload" / c.PaddedString(c.this.PayloadLength, "ascii"),
    )

    connection = websocket.create_connection(session["streamUrl"])
    try:
        init_payload = {
            "MessageSchemaVersion": "1.0",
            "RequestId": str(uuid.uuid4()),
            "TokenValue": session["tokenValue"],
        }
        connection.send(json.dumps(init_payload))
        while True:
            resp = connection.recv()
            message = AgentMessageHeader.parse(resp)
            if "channel_closed" in message.MessageType:
                raise Exception("Channel closed before command output was received")
            if "output_stream_data" in message.MessageType:
                break
    finally:
        connection.close()
    payload_message = AgentMessagePayload.parse(resp[message.HeaderLength :])
    return payload_message.Payload


exec_resp = boto3.client("ecs").execute_command(
    cluster=cluster,
    task=task,
    container=container,
    interactive=True,
    command=cmd,
)
print(session_reader(exec_resp["session"]))

这就是 我的类似问题的全部内容。


对于前来寻求类似解决方案的任何人,我创建了一个工具来简化此任务。它被称为interloper