botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) The security token included in the request is expired
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) The security token included in the request is expired
我遇到令牌过期问题。目前,它在 60 分钟后到期。这个问题的问题是这个步骤函数会 运行 超过 17 小时,所以我需要能够捕获这个会话的异常或者重新担任角色而不中断或停止步骤函数执行python。现有政策无法更改,我需要一个解决方法。任何使用 aws secret manager 缓存它并在 python 脚本中使用它的指针。
错误:
> Status...: RUNNING
> Status...: RUNNING
Traceback (most recent call last):
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 401, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 731, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the DescribeExecution operation: The security token included in the request is expired
##[error]The process '/opt/hostedtoolcache/Python/3.8.12/x64/bin/python' failed with exit code 1
Python代码:
import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import json
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
session = boto3.Session()
session.get_credentials().secret_key
print(session.get_credentials().secret_key)
sf_client = boto3.client('stepfunctions', region_name="us-west-2")
sf_output = sf_client.start_execution (
stateMachineArn = 'arn:aws:states:us-west-2:xxxxx:stateMachine:PredictiveAnalyticsPipelineOrchestration-xxxxxxxx',
#input = json.dumps({}) # this is for all
#input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
input ='{ \"basin_list\": [\"RIpe\"],\"db\": \"RAMAN\",\"pipelinePhases\": \"lifestage,monthly_production,tiers,sequence,quintiles\"}'
)
while True:
time.sleep(10) # don't need to check every nanosecond
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE
print("%s: %s" % ("> Status...", step_status))
if step_status == 'RUNNING':
continue
elif step_status == 'FAILED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
#raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
break
elif step_status == 'TIMED_OUT':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
elif step_status == 'ABORTED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
else: # SUCCEEDED
step_status == 'SUCCEEDED'
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
break
管道代码:
jobs:
- job: determine_the_stepfunction_status
timeoutInMinutes: 5000
cancelTimeoutInMinutes: 3
steps:
- task: AWSAssumeRole@1
displayName: 'Login to AWS'
inputs:
RoleArn: 'arn:aws:iam::$(AWS_ACCOUNT_ID):role/Energyxxxxxx-xxxxx-Role'
SessionName: 'Energyxxxxxx-xxxxx-Role'
ConfigureAWSCLIEnvironmentVariables: true
- task: UsePythonVersion@0
inputs:
versionSpec: '3.8'
#addToPath: true
#architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
- script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
displayName: 'Install python tools'
- task: PythonScript@0
env:
STEP_STATUS: $(step_status)
AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
AWS_SESSION_TOKEN: $(AWS.SessionToken)
inputs:
scriptSource: 'filePath' # Options: filePath, inline
scriptPath: 'step_function.py'
failOnStderr: false # Optional
现在已解决此问题,使用 python 每 45 分钟刷新一次会话。 python调整为:
import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import botocore
import json
import base64
import botocore.session
from botocore.credentials import AssumeRoleCredentialFetcher, DeferredRefreshableCredentials
from botocore.exceptions import ClientError
from botocore.session import get_session
accounts = [
{"name": "Prod", "id": "youraccountid"},
#{"name": "Account2", "id": "xxxxxxxxxxx"} # you can add this if you have multiple account
]
regions = [ "us-west-2" ]
# Replace myRole with your local named profile
boto3.setup_default_session()
# 3600 seconds in an hour, this value should match your role's but am using 45 minutes refresh
# maximum session duration (AWS default is 1 hour). If you're
# role chaining (e.g. saml2aws) 1 hour is a hard limit.
def refresh_external_credentials():
# Assume role, get details
client = boto3.client('sts')
credentials = client.assume_role(
RoleArn='arn:aws:iam::youraccountid:role/Energyxxxxxx-xxxxx-Role',
RoleSessionName="Energyxxxxxx-xxxxx-Role", # this name does not matter
DurationSeconds=3000
).get("Credentials")
return {
"access_key": credentials.get('AccessKeyId'),
"secret_key": credentials.get('SecretAccessKey'),
"token": credentials.get('SessionToken'),
"expiry_time": credentials.get('Expiration').isoformat()
}
roleArn = ''
for account in accounts:
id = account.get('id')
accountName = account.get('name')
# Replace roleToAssume with your target role
roleArn = 'arn:aws:iam::' + str(id) + ':role/Energyxxxxxx-xxxxx-Role'
credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
metadata=refresh_external_credentials(),
refresh_using=refresh_external_credentials,
method="sts-assume-role",
)
for region in regions:
session = get_session()
session._credentials = credentials
session.set_config_variable("region", region)
autorefresh_session = boto3.session.Session(botocore_session=session)
# Your boto3 calls, for example...
#rds = autorefresh_session.client('rds')
#databases = rds.describe_db_instances()
sf_client = autorefresh_session.client('stepfunctions')
sf_output = sf_client.start_execution (
stateMachineArn = 'arn:aws:states:us-west-2:youraccountid:stateMachine:name_of_your_state_machine',
#input = json.dumps({}) # this is for all
#input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
input ='{ \"basin_list\": [\"POwer\",\"BOW\"],\"db_postfix\": \"schemaname\",\"pipe\": \"etl,frac,ecline,capex,breakeven,lifestage\"}'
)
while True:
time.sleep(10) # don't need to check every nanosecond
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE
print("%s: %s" % ("> Status...", step_status))
if step_status == 'RUNNING':
continue
elif step_status == 'FAILED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
#raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
break
elif step_status == 'TIMED_OUT':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
elif step_status == 'ABORTED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
else: # SUCCEEDED
step_status == 'SUCCEEDED'
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
break
关于管道,不需要假定角色以避免与 python 脚本中的其他角色发生冲突
jobs:
- job: determine_the_stepfunction_status
timeoutInMinutes: 5000
cancelTimeoutInMinutes: 3
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.8'
#addToPath: true
#architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
- script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
displayName: 'Install python tools'
- task: PythonScript@0
env:
STEP_STATUS: $(step_status)
AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
AWS_SESSION_TOKEN: $(AWS.SessionToken)
inputs:
scriptSource: 'filePath' # Options: filePath, inline
scriptPath: 'step_function.py'
failOnStderr: false # Optional
我遇到令牌过期问题。目前,它在 60 分钟后到期。这个问题的问题是这个步骤函数会 运行 超过 17 小时,所以我需要能够捕获这个会话的异常或者重新担任角色而不中断或停止步骤函数执行python。现有政策无法更改,我需要一个解决方法。任何使用 aws secret manager 缓存它并在 python 脚本中使用它的指针。
错误:
> Status...: RUNNING
> Status...: RUNNING
Traceback (most recent call last):
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 401, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 731, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the DescribeExecution operation: The security token included in the request is expired
##[error]The process '/opt/hostedtoolcache/Python/3.8.12/x64/bin/python' failed with exit code 1
Python代码:
import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import json
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
session = boto3.Session()
session.get_credentials().secret_key
print(session.get_credentials().secret_key)
sf_client = boto3.client('stepfunctions', region_name="us-west-2")
sf_output = sf_client.start_execution (
stateMachineArn = 'arn:aws:states:us-west-2:xxxxx:stateMachine:PredictiveAnalyticsPipelineOrchestration-xxxxxxxx',
#input = json.dumps({}) # this is for all
#input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
input ='{ \"basin_list\": [\"RIpe\"],\"db\": \"RAMAN\",\"pipelinePhases\": \"lifestage,monthly_production,tiers,sequence,quintiles\"}'
)
while True:
time.sleep(10) # don't need to check every nanosecond
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE
print("%s: %s" % ("> Status...", step_status))
if step_status == 'RUNNING':
continue
elif step_status == 'FAILED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
#raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
break
elif step_status == 'TIMED_OUT':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
elif step_status == 'ABORTED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
else: # SUCCEEDED
step_status == 'SUCCEEDED'
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
break
管道代码:
jobs:
- job: determine_the_stepfunction_status
timeoutInMinutes: 5000
cancelTimeoutInMinutes: 3
steps:
- task: AWSAssumeRole@1
displayName: 'Login to AWS'
inputs:
RoleArn: 'arn:aws:iam::$(AWS_ACCOUNT_ID):role/Energyxxxxxx-xxxxx-Role'
SessionName: 'Energyxxxxxx-xxxxx-Role'
ConfigureAWSCLIEnvironmentVariables: true
- task: UsePythonVersion@0
inputs:
versionSpec: '3.8'
#addToPath: true
#architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
- script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
displayName: 'Install python tools'
- task: PythonScript@0
env:
STEP_STATUS: $(step_status)
AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
AWS_SESSION_TOKEN: $(AWS.SessionToken)
inputs:
scriptSource: 'filePath' # Options: filePath, inline
scriptPath: 'step_function.py'
failOnStderr: false # Optional
现在已解决此问题,使用 python 每 45 分钟刷新一次会话。 python调整为:
import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import botocore
import json
import base64
import botocore.session
from botocore.credentials import AssumeRoleCredentialFetcher, DeferredRefreshableCredentials
from botocore.exceptions import ClientError
from botocore.session import get_session
accounts = [
{"name": "Prod", "id": "youraccountid"},
#{"name": "Account2", "id": "xxxxxxxxxxx"} # you can add this if you have multiple account
]
regions = [ "us-west-2" ]
# Replace myRole with your local named profile
boto3.setup_default_session()
# 3600 seconds in an hour, this value should match your role's but am using 45 minutes refresh
# maximum session duration (AWS default is 1 hour). If you're
# role chaining (e.g. saml2aws) 1 hour is a hard limit.
def refresh_external_credentials():
# Assume role, get details
client = boto3.client('sts')
credentials = client.assume_role(
RoleArn='arn:aws:iam::youraccountid:role/Energyxxxxxx-xxxxx-Role',
RoleSessionName="Energyxxxxxx-xxxxx-Role", # this name does not matter
DurationSeconds=3000
).get("Credentials")
return {
"access_key": credentials.get('AccessKeyId'),
"secret_key": credentials.get('SecretAccessKey'),
"token": credentials.get('SessionToken'),
"expiry_time": credentials.get('Expiration').isoformat()
}
roleArn = ''
for account in accounts:
id = account.get('id')
accountName = account.get('name')
# Replace roleToAssume with your target role
roleArn = 'arn:aws:iam::' + str(id) + ':role/Energyxxxxxx-xxxxx-Role'
credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
metadata=refresh_external_credentials(),
refresh_using=refresh_external_credentials,
method="sts-assume-role",
)
for region in regions:
session = get_session()
session._credentials = credentials
session.set_config_variable("region", region)
autorefresh_session = boto3.session.Session(botocore_session=session)
# Your boto3 calls, for example...
#rds = autorefresh_session.client('rds')
#databases = rds.describe_db_instances()
sf_client = autorefresh_session.client('stepfunctions')
sf_output = sf_client.start_execution (
stateMachineArn = 'arn:aws:states:us-west-2:youraccountid:stateMachine:name_of_your_state_machine',
#input = json.dumps({}) # this is for all
#input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
input ='{ \"basin_list\": [\"POwer\",\"BOW\"],\"db_postfix\": \"schemaname\",\"pipe\": \"etl,frac,ecline,capex,breakeven,lifestage\"}'
)
while True:
time.sleep(10) # don't need to check every nanosecond
sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE
print("%s: %s" % ("> Status...", step_status))
if step_status == 'RUNNING':
continue
elif step_status == 'FAILED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
#raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
break
elif step_status == 'TIMED_OUT':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
elif step_status == 'ABORTED':
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
break
else: # SUCCEEDED
step_status == 'SUCCEEDED'
print(step_status)
print (f'##vso[task.setvariable variable=step_status]{step_status}')
print(sf_response)
break
关于管道,不需要假定角色以避免与 python 脚本中的其他角色发生冲突
jobs:
- job: determine_the_stepfunction_status
timeoutInMinutes: 5000
cancelTimeoutInMinutes: 3
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.8'
#addToPath: true
#architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
- script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
displayName: 'Install python tools'
- task: PythonScript@0
env:
STEP_STATUS: $(step_status)
AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
AWS_SESSION_TOKEN: $(AWS.SessionToken)
inputs:
scriptSource: 'filePath' # Options: filePath, inline
scriptPath: 'step_function.py'
failOnStderr: false # Optional