Python 如果空 JSON 对象什么也不做
Python do nothing IF empty JSON object
我是一个 python 新手,正在寻找最佳实践建议和解决此问题的方法。我能够成功地从 Okta API 中提取我需要的数据,这导致输出 "PRECONDITION FAILED. Errors reported by remote server: Failed to update. Resource changed on the server." 仅此而已我想要的一半。我希望能够每 60 分钟检查一次该输出,如果不存在则什么都不做。我以为这就是我在 "start" 函数下面的代码中所做的,但它似乎没有按照我想要的方式运行。如何在变量事件中存储 json 个对象或在对象未出现时退出应用程序?先感谢您!
import requests
import os
import json
import time
from datetime import datetime, timedelta
key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
app_id = 'target.id eq "*******"'
all_params = f'{event_type} and {app_id} and {outcome}'
api_url = f'https://domain.okta.com/api/v1/logs'
last_hour_date_time = datetime.utcnow() - timedelta(minutes=60)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
def auth_okta():
url = api_url.format()
print(url)
params = {
'filter': all_params,
'since': since
}
response = requests.get(url, params=params,
headers={'Accept': 'application/json',
'authorization': key})
response_json = response.json()
return response_json
def start():
for event_data in auth_okta():
events = event_data['outcome']['reason']
if not events:
print('nothing there')
else:
print(events)
start()
尝试在您的 event_data:
上使用 .get 命令
for event_data in auth_okta():
events = None
while events is None:
events = event_data.get("outcome", None).get("reason", None)
print("Nothing found... waiting")
time.sleep(3600) #wait one hour
print(events)
您可以使用 sys.exit() 结束所有执行。
if events == None:
sys.exit(0)
似乎我的代码初始结构创建了太多列表,我最终重构了整个脚本。我也听取了格雷格的一些建议。
import requests
import sys
import os
import json
import time
from datetime import datetime, timedelta
key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
target_type = 'target.type eq "User"'
app_id = 'target.id eq "APP-ID-HERE"'
all_params = f'{event_type} and {target_type} and {app_id} and {outcome}'
api_url = f'https://DOMAIN.okta.com/api/v1/logs'
slack_url = "SLACK WEBHOOK URL"
last_hour_date_time = datetime.utcnow() - timedelta(days=1)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
unique_set=[]
class Events:
def okta_auth(self):
event_list=[]
url = api_url.format()
params = {
'filter': all_params,
'since': since
}
response = requests.get(url, params=params,
headers={'Accept': 'application/json',
'authorization': key})
response_json = response.json()
for event_data in response_json:
events = event_data['outcome']['reason']
targets = event_data['target']
parse = list(map(lambda x: x['alternateId'], targets))
target_list=[]
event_list.append(events)
target_list.append(parse[1])
for item in target_list:
if item not in unique_set:
unique_set.append(item)
if event_list != []:
self.post_slack()
else:
sys.exit(0)
def post_slack(self):
url = slack_url.format()
payload = "{\"text\": \" Twilio Flex provisioing failure. Please check the following users: \n %s \"}" % '\n'.join(unique_set)
requests.post(url, headers={'Accept': 'application/json'}, data=payload)
if __name__ == "__main__":
Events().okta_auth()
我是一个 python 新手,正在寻找最佳实践建议和解决此问题的方法。我能够成功地从 Okta API 中提取我需要的数据,这导致输出 "PRECONDITION FAILED. Errors reported by remote server: Failed to update. Resource changed on the server." 仅此而已我想要的一半。我希望能够每 60 分钟检查一次该输出,如果不存在则什么都不做。我以为这就是我在 "start" 函数下面的代码中所做的,但它似乎没有按照我想要的方式运行。如何在变量事件中存储 json 个对象或在对象未出现时退出应用程序?先感谢您!
import requests
import os
import json
import time
from datetime import datetime, timedelta
key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
app_id = 'target.id eq "*******"'
all_params = f'{event_type} and {app_id} and {outcome}'
api_url = f'https://domain.okta.com/api/v1/logs'
last_hour_date_time = datetime.utcnow() - timedelta(minutes=60)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
def auth_okta():
url = api_url.format()
print(url)
params = {
'filter': all_params,
'since': since
}
response = requests.get(url, params=params,
headers={'Accept': 'application/json',
'authorization': key})
response_json = response.json()
return response_json
def start():
for event_data in auth_okta():
events = event_data['outcome']['reason']
if not events:
print('nothing there')
else:
print(events)
start()
尝试在您的 event_data:
上使用 .get 命令for event_data in auth_okta():
events = None
while events is None:
events = event_data.get("outcome", None).get("reason", None)
print("Nothing found... waiting")
time.sleep(3600) #wait one hour
print(events)
您可以使用 sys.exit() 结束所有执行。
if events == None:
sys.exit(0)
似乎我的代码初始结构创建了太多列表,我最终重构了整个脚本。我也听取了格雷格的一些建议。
import requests
import sys
import os
import json
import time
from datetime import datetime, timedelta
key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
target_type = 'target.type eq "User"'
app_id = 'target.id eq "APP-ID-HERE"'
all_params = f'{event_type} and {target_type} and {app_id} and {outcome}'
api_url = f'https://DOMAIN.okta.com/api/v1/logs'
slack_url = "SLACK WEBHOOK URL"
last_hour_date_time = datetime.utcnow() - timedelta(days=1)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
unique_set=[]
class Events:
def okta_auth(self):
event_list=[]
url = api_url.format()
params = {
'filter': all_params,
'since': since
}
response = requests.get(url, params=params,
headers={'Accept': 'application/json',
'authorization': key})
response_json = response.json()
for event_data in response_json:
events = event_data['outcome']['reason']
targets = event_data['target']
parse = list(map(lambda x: x['alternateId'], targets))
target_list=[]
event_list.append(events)
target_list.append(parse[1])
for item in target_list:
if item not in unique_set:
unique_set.append(item)
if event_list != []:
self.post_slack()
else:
sys.exit(0)
def post_slack(self):
url = slack_url.format()
payload = "{\"text\": \" Twilio Flex provisioing failure. Please check the following users: \n %s \"}" % '\n'.join(unique_set)
requests.post(url, headers={'Accept': 'application/json'}, data=payload)
if __name__ == "__main__":
Events().okta_auth()