boto3: execute_command 在 python 脚本中
boto3: execute_command inside python script
我正在尝试 运行 向 fargate 管理的 ecs 容器发出命令。我可以建立连接并成功执行,但我无法从我的 python 脚本中的上述命令获得响应。
import boto3
import pprint as pp
client = boto3.client("ecs")
cluster = "my-mundane-cluster-name"
def main():
task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
for task_arn in task_arns.get("taskArns", []):
cmd_out = client.execute_command(
cluster=cluster,
command="ls",
interactive=True,
task=task_arn,
)
pp.pprint(f"{cmd_out}")
if __name__ == "__main__":
main()
我用 ls
替换了命令,但就所有意图和目的而言,流程是相同的。这是我得到的回复
{
'clusterArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:cluster/■■■■■■',
'containerArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:container/■■■■■■/■■■■■■/■■■■■■■■■■■■■■■■■■',
'containerName': '■■■■■■',
'interactive': True,
'session': {
'sessionId': 'ecs-execute-command-■■■■■■■■■',
'streamUrl': '■■■■■■■■■■■■■■■■■■',
'tokenValue': '■■■■■■■■■■■■■■■■■■'
},
'taskArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■■■■:task/■■■■■■■■■/■■■■■■■■■■■■■■■■■■',
'ResponseMetadata': {
'RequestId': '■■■■■■■■■■■■■■■■■■',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'x-amzn-requestid': '■■■■■■■■■■■■■■■■■■',
'content-type': 'application/x-amz-json-1.1',
'content-length': '■■■',
'date': 'Thu, 29 Jul 2021 02:39:24 GMT'
},
'RetryAttempts': 0
}
}
我已经尝试 运行 将命令设置为非交互式命令以查看它是否 returns 响应,但 sdk 显示 Interactive is the only mode supported currently
。我也尝试过在线搜索有关如何执行此操作的线索,但没有成功。
非常感谢任何帮助。
一个快速的解决方案是使用 logging
而不是 pprint
:
boto3.set_stream_logger('boto3.resources', logging.INFO)
命令输出的值位于位于 streamId
的文档流中。您必须初始化一个新会话并将会话 ID 传递给它以检索它的内容。
粗略示例:
import boto3
import pprint as pp
client = boto3.client("ecs")
ssm_client = boto3.client("ssm")
cluster = "my-mundane-cluster-name"
def main():
task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
for task_arn in task_arns.get("taskArns", []):
cmd_out = client.execute_command(
cluster=cluster,
command="ls",
interactive=True,
task=task_arn,
)
session_response = client.describe_sessions(
State='Active'|'History',
MaxResults=123,
NextToken='string',
Filters=[
{
'key': 'InvokedAfter'|'InvokedBefore'|'Target'|'Owner'|'Status'|'SessionId',
'value': cmd_out["session"]["sessionId"]
},
]
)
document_response = client.get_document(
Name=session_response.sessions[0].document_name,
DocumentFormat='YAML'|'JSON'|'TEXT'
)
pp.pprint(document_response)
参考资料
SSM:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html
SSM #get_document:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html#SSM.Client.get_document
我需要完成类似的任务,但据我所知,它并没有像这里回答的那样工作。让我知道这是否对您有用,如果有用,您是如何实施的。
对我来说,解决方案是打开会话中返回的 websocket 连接,然后读取输出。像这样:
import boto3
import json
import uuid
import construct as c
import websocket
def session_reader(session: dict) -> str:
AgentMessageHeader = c.Struct(
"HeaderLength" / c.Int32ub,
"MessageType" / c.PaddedString(32, "ascii"),
)
AgentMessagePayload = c.Struct(
"PayloadLength" / c.Int32ub,
"Payload" / c.PaddedString(c.this.PayloadLength, "ascii"),
)
connection = websocket.create_connection(session["streamUrl"])
try:
init_payload = {
"MessageSchemaVersion": "1.0",
"RequestId": str(uuid.uuid4()),
"TokenValue": session["tokenValue"],
}
connection.send(json.dumps(init_payload))
while True:
resp = connection.recv()
message = AgentMessageHeader.parse(resp)
if "channel_closed" in message.MessageType:
raise Exception("Channel closed before command output was received")
if "output_stream_data" in message.MessageType:
break
finally:
connection.close()
payload_message = AgentMessagePayload.parse(resp[message.HeaderLength :])
return payload_message.Payload
exec_resp = boto3.client("ecs").execute_command(
cluster=cluster,
task=task,
container=container,
interactive=True,
command=cmd,
)
print(session_reader(exec_resp["session"]))
这就是 我的类似问题的全部内容。
对于前来寻求类似解决方案的任何人,我创建了一个工具来简化此任务。它被称为interloper。
我正在尝试 运行 向 fargate 管理的 ecs 容器发出命令。我可以建立连接并成功执行,但我无法从我的 python 脚本中的上述命令获得响应。
import boto3
import pprint as pp
client = boto3.client("ecs")
cluster = "my-mundane-cluster-name"
def main():
task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
for task_arn in task_arns.get("taskArns", []):
cmd_out = client.execute_command(
cluster=cluster,
command="ls",
interactive=True,
task=task_arn,
)
pp.pprint(f"{cmd_out}")
if __name__ == "__main__":
main()
我用 ls
替换了命令,但就所有意图和目的而言,流程是相同的。这是我得到的回复
{
'clusterArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:cluster/■■■■■■',
'containerArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■:container/■■■■■■/■■■■■■/■■■■■■■■■■■■■■■■■■',
'containerName': '■■■■■■',
'interactive': True,
'session': {
'sessionId': 'ecs-execute-command-■■■■■■■■■',
'streamUrl': '■■■■■■■■■■■■■■■■■■',
'tokenValue': '■■■■■■■■■■■■■■■■■■'
},
'taskArn': 'arn:aws:ecs:■■■■■■■■■■■■:■■■■■■■■■:task/■■■■■■■■■/■■■■■■■■■■■■■■■■■■',
'ResponseMetadata': {
'RequestId': '■■■■■■■■■■■■■■■■■■',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'x-amzn-requestid': '■■■■■■■■■■■■■■■■■■',
'content-type': 'application/x-amz-json-1.1',
'content-length': '■■■',
'date': 'Thu, 29 Jul 2021 02:39:24 GMT'
},
'RetryAttempts': 0
}
}
我已经尝试 运行 将命令设置为非交互式命令以查看它是否 returns 响应,但 sdk 显示 Interactive is the only mode supported currently
。我也尝试过在线搜索有关如何执行此操作的线索,但没有成功。
非常感谢任何帮助。
一个快速的解决方案是使用 logging
而不是 pprint
:
boto3.set_stream_logger('boto3.resources', logging.INFO)
命令输出的值位于位于 streamId
的文档流中。您必须初始化一个新会话并将会话 ID 传递给它以检索它的内容。
粗略示例:
import boto3
import pprint as pp
client = boto3.client("ecs")
ssm_client = boto3.client("ssm")
cluster = "my-mundane-cluster-name"
def main():
task_arns = client.list_tasks(cluster=cluster, launchType="FARGATE")
for task_arn in task_arns.get("taskArns", []):
cmd_out = client.execute_command(
cluster=cluster,
command="ls",
interactive=True,
task=task_arn,
)
session_response = client.describe_sessions(
State='Active'|'History',
MaxResults=123,
NextToken='string',
Filters=[
{
'key': 'InvokedAfter'|'InvokedBefore'|'Target'|'Owner'|'Status'|'SessionId',
'value': cmd_out["session"]["sessionId"]
},
]
)
document_response = client.get_document(
Name=session_response.sessions[0].document_name,
DocumentFormat='YAML'|'JSON'|'TEXT'
)
pp.pprint(document_response)
参考资料
SSM:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html
SSM #get_document:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html#SSM.Client.get_document
我需要完成类似的任务,但据我所知,它并没有像这里回答的那样工作。让我知道这是否对您有用,如果有用,您是如何实施的。
对我来说,解决方案是打开会话中返回的 websocket 连接,然后读取输出。像这样:
import boto3
import json
import uuid
import construct as c
import websocket
def session_reader(session: dict) -> str:
AgentMessageHeader = c.Struct(
"HeaderLength" / c.Int32ub,
"MessageType" / c.PaddedString(32, "ascii"),
)
AgentMessagePayload = c.Struct(
"PayloadLength" / c.Int32ub,
"Payload" / c.PaddedString(c.this.PayloadLength, "ascii"),
)
connection = websocket.create_connection(session["streamUrl"])
try:
init_payload = {
"MessageSchemaVersion": "1.0",
"RequestId": str(uuid.uuid4()),
"TokenValue": session["tokenValue"],
}
connection.send(json.dumps(init_payload))
while True:
resp = connection.recv()
message = AgentMessageHeader.parse(resp)
if "channel_closed" in message.MessageType:
raise Exception("Channel closed before command output was received")
if "output_stream_data" in message.MessageType:
break
finally:
connection.close()
payload_message = AgentMessagePayload.parse(resp[message.HeaderLength :])
return payload_message.Payload
exec_resp = boto3.client("ecs").execute_command(
cluster=cluster,
task=task,
container=container,
interactive=True,
command=cmd,
)
print(session_reader(exec_resp["session"]))
这就是
对于前来寻求类似解决方案的任何人,我创建了一个工具来简化此任务。它被称为interloper。