Python 如果空 JSON 对象什么也不做

Python do nothing IF empty JSON object

我是一个 python 新手,正在寻找最佳实践建议和解决此问题的方法。我能够成功地从 Okta API 中提取我需要的数据,这导致输出 "PRECONDITION FAILED. Errors reported by remote server: Failed to update. Resource changed on the server." 仅此而已我想要的一半。我希望能够每 60 分钟检查一次该输出,如果不存在则什么都不做。我以为这就是我在 "start" 函数下面的代码中所做的,但它似乎没有按照我想要的方式运行。如何在变量事件中存储 json 个对象或在对象未出现时退出应用程序?先感谢您!

import requests
import os
import json
import time
from datetime import datetime, timedelta

key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
app_id = 'target.id eq "*******"'
all_params = f'{event_type} and {app_id} and {outcome}'
api_url = f'https://domain.okta.com/api/v1/logs'

last_hour_date_time = datetime.utcnow() - timedelta(minutes=60)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))

def auth_okta():
    url = api_url.format()
    print(url)
    params = {
        'filter': all_params,
        'since': since
    }
    response = requests.get(url, params=params,
                        headers={'Accept': 'application/json', 
                        'authorization': key})
    response_json = response.json()
    return response_json

def start():
    for event_data in auth_okta():
        events = event_data['outcome']['reason']
        if not events:
            print('nothing there')
        else:
            print(events)

start()

尝试在您的 event_data:

上使用 .get 命令
for event_data in auth_okta():
    events = None
    while events is None:
        events = event_data.get("outcome", None).get("reason", None)
        print("Nothing found... waiting")
        time.sleep(3600) #wait one hour
    print(events)

您可以使用 sys.exit() 结束所有执行。

if events == None:
    sys.exit(0)

似乎我的代码初始结构创建了太多列表,我最终重构了整个脚本。我也听取了格雷格的一些建议。

import requests
import sys
import os
import json
import time
from datetime import datetime, timedelta

key = os.environ['OKTA_AUTH']
outcome = 'outcome.result eq "FAILURE"'
event_type = 'eventType eq "application.provision.user.deactivate"'
target_type = 'target.type eq "User"'
app_id = 'target.id eq "APP-ID-HERE"'
all_params = f'{event_type} and {target_type} and {app_id} and {outcome}'
api_url = f'https://DOMAIN.okta.com/api/v1/logs'
slack_url = "SLACK WEBHOOK URL"
last_hour_date_time = datetime.utcnow() - timedelta(days=1)
since = str(last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S.000Z'))
unique_set=[]


class Events:
    def okta_auth(self):
        event_list=[]
        url = api_url.format()
        params = {
            'filter': all_params,
            'since': since
        }
        response = requests.get(url, params=params,
                            headers={'Accept': 'application/json', 
                            'authorization': key})
        response_json = response.json()
        for event_data in response_json:
            events = event_data['outcome']['reason']
            targets = event_data['target']
            parse = list(map(lambda x: x['alternateId'], targets))
            target_list=[]
            event_list.append(events)
            target_list.append(parse[1])
            for item in target_list:
                if item not in unique_set:
                    unique_set.append(item)
        if event_list != []:
            self.post_slack()
        else:
            sys.exit(0)

    def post_slack(self):
        url = slack_url.format()
        payload =  "{\"text\": \" Twilio Flex provisioing failure. Please check the following users: \n %s \"}" % '\n'.join(unique_set)        
        requests.post(url, headers={'Accept': 'application/json'}, data=payload)

if __name__ == "__main__":
    Events().okta_auth()