简化流式 request.get 和 JSON 响应解码
Simplify a streamed request.get and JSON response decode
我一直在编写一些代码,这些代码将从名为 PulsePoint 的服务中获取紧急事件信息。它与内置于计算机控制调度中心的软件配合使用。
这是一款应用程序,可让接受过心肺复苏术培训的公民英雄在急救人员到达现场之前提供帮助。我只是用它来处理其他紧急事件。
我在那里对应用程序进行了逆向工程,因为他们没有关于如何提出您自己的请求的文档。因此,我故意留下了 api 密钥和身份验证信息,因为它在 Android 清单文件中以纯文本形式存在。
我最终肯定会制作一个 python 模块来与此服务交互,目前它只是一团糟。
总之,对于冗长无聊的介绍感到抱歉。
我真正的问题是,如何简化此函数,使其在发出定时请求和返回可通过下标使用的 json 对象时看起来和运行起来更简洁?
import requests, time, json
def getjsonobject(agency):
startsecond = time.strftime("%S")
url = REDACTED
body = []
currentagency = requests.get(url=url, verify=False, stream=True, auth=requests.auth.HTTPBasicAuth(REDACTED, REDCATED), timeout = 13)
for chunk in currentagency.iter_content(1024):
body.append(chunk)
if(int(startsecond) + 5 < int(time.strftime("%S"))): #Shitty internet proof, with timeout above
raise Exception("Server sent to much data")
jsonstringforagency = str(b''.join(body))[2:][:-1] #Removes charecters that define the response body so that the next line doesnt error
currentagencyjson = json.loads(jsonstringforagency) #Loads response as decodable JSON
return currentagencyjson
currentincidents = getjsonobject("lafdw")
for inci in currentincidents["incidents"]["active"]:
print(inci["FullDisplayAddress"])
Requests 处理获取正文数据、检查 json 并自动为您解析 json,并且由于您给出了 timeout
arg,我不认为您需要单独的超时处理。 Request 还为 get 请求处理构建 URL,因此您可以将查询信息放入字典中,这样就更好了。结合这些更改并删除未使用的导入可以得到:
import requests
params = dict(both=1,
minimal=1,
apikey=REDACTED)
url = REDACTED
def getjsonobject(agency):
myParams = dict(params, agency=agency)
return requests.get(url, verify=False, params=myParams, stream=True,
auth=requests.auth.HTTPBasicAuth(REDACTED, REDACTED),
timeout = 13).json()
这为我提供了相同的输出。
我一直在编写一些代码,这些代码将从名为 PulsePoint 的服务中获取紧急事件信息。它与内置于计算机控制调度中心的软件配合使用。
这是一款应用程序,可让接受过心肺复苏术培训的公民英雄在急救人员到达现场之前提供帮助。我只是用它来处理其他紧急事件。
我在那里对应用程序进行了逆向工程,因为他们没有关于如何提出您自己的请求的文档。因此,我故意留下了 api 密钥和身份验证信息,因为它在 Android 清单文件中以纯文本形式存在。
我最终肯定会制作一个 python 模块来与此服务交互,目前它只是一团糟。
总之,对于冗长无聊的介绍感到抱歉。
我真正的问题是,如何简化此函数,使其在发出定时请求和返回可通过下标使用的 json 对象时看起来和运行起来更简洁?
import requests, time, json
def getjsonobject(agency):
startsecond = time.strftime("%S")
url = REDACTED
body = []
currentagency = requests.get(url=url, verify=False, stream=True, auth=requests.auth.HTTPBasicAuth(REDACTED, REDCATED), timeout = 13)
for chunk in currentagency.iter_content(1024):
body.append(chunk)
if(int(startsecond) + 5 < int(time.strftime("%S"))): #Shitty internet proof, with timeout above
raise Exception("Server sent to much data")
jsonstringforagency = str(b''.join(body))[2:][:-1] #Removes charecters that define the response body so that the next line doesnt error
currentagencyjson = json.loads(jsonstringforagency) #Loads response as decodable JSON
return currentagencyjson
currentincidents = getjsonobject("lafdw")
for inci in currentincidents["incidents"]["active"]:
print(inci["FullDisplayAddress"])
Requests 处理获取正文数据、检查 json 并自动为您解析 json,并且由于您给出了 timeout
arg,我不认为您需要单独的超时处理。 Request 还为 get 请求处理构建 URL,因此您可以将查询信息放入字典中,这样就更好了。结合这些更改并删除未使用的导入可以得到:
import requests
params = dict(both=1,
minimal=1,
apikey=REDACTED)
url = REDACTED
def getjsonobject(agency):
myParams = dict(params, agency=agency)
return requests.get(url, verify=False, params=myParams, stream=True,
auth=requests.auth.HTTPBasicAuth(REDACTED, REDACTED),
timeout = 13).json()
这为我提供了相同的输出。