如何从 boto3 ecs execute_command 获取输出?

How can I get output from boto3 ecs execute_command?

我在 Fargate 上有一个 ECS 任务 运行ning,我想在上面 运行 boto3 中的命令并取回输出。我可以在 awscli 中这样做。

➜ aws ecs execute-command --cluster cluster1 \                                                                                   
    --task abc \
    --container container1 \
    --interactive \
    --command 'echo hi'    

The Session Manager plugin was installed successfully. Use the AWS CLI to start a session.

Starting session with SessionId: ecs-execute-command-0f913e47ae7801aeb
hi

Exiting session with sessionId: ecs-execute-command-0f913e47ae7801aeb.

但我不知道如何在 boto3 中获得相同的输出。

ecs = boto3.client("ecs")
ssm = boto3.client("ssm")
exec_resp = ecs.execute_command(
    cluster=self.cluster,
    task=self.task,
    container=self.container,
    interactive=True,
    command="echo hi",
)
s_active = ssm.describe_sessions(
    State="Active",
    Filters=[
        {
            "key": "SessionId",
            "value": exec_resp["session"]["sessionId"],
        },
    ],
)
# Here I get the document for the active session.
doc_active = ssm.get_document(Name=s_active["Sessions"][0]["DocumentName"])
# Now I wait for the session to finish.
s_history = {}
done = False
while not done:
    s_history = ssm.describe_sessions(
        State="History",
        Filters=[
            {
                "key": "SessionId",
                "value": exec_resp["session"]["sessionId"],
            },
        ],
    )
    done = len(s_history["Sessions"]) > 0
doc_history = ssm.get_document(Name=s_history["Sessions"][0]["DocumentName"])

现在会话正在终止,我得到了另一个文档,但似乎仍然没有任何输出。有人从中得到输出吗?怎么样?


对于前来寻求类似解决方案的任何人,我创建了一个工具来简化此任务。它被称为interloper. This is mostly thanks to the excellent

好的,基本上通过阅读 ssm 会话管理器插件 source code 我想出了以下简化的重新实现,它能够只获取命令输出: (你需要pip install websocket-client construct

import json
import uuid

import boto3
import construct as c
import websocket

ecs = boto3.client("ecs")
ssm = boto3.client("ssm")
exec_resp = ecs.execute_command(
    cluster=self.cluster,
    task=self.task,
    container=self.container,
    interactive=True,
    command="ls -la /",
)

session = exec_resp['session']
connection = websocket.create_connection(session['streamUrl'])
try:
    init_payload = {
        "MessageSchemaVersion": "1.0",
        "RequestId": str(uuid.uuid4()),
        "TokenValue": session['tokenValue']
    }
    connection.send(json.dumps(init_payload))

    AgentMessageHeader = c.Struct(
        'HeaderLength' / c.Int32ub,
        'MessageType' / c.PaddedString(32, 'ascii'),
    )

    AgentMessagePayload = c.Struct(
        'PayloadLength' / c.Int32ub,
        'Payload' / c.PaddedString(c.this.PayloadLength, 'ascii')
    )

    while True:
        response = connection.recv()

        message = AgentMessageHeader.parse(response)

        if 'channel_closed' in message.MessageType:
            raise Exception('Channel closed before command output was received')

        if 'output_stream_data' in message.MessageType:
            break

finally:
    connection.close()

payload_message = AgentMessagePayload.parse(response[message.HeaderLength:])

print(payload_message.Payload)