使用 Twisted 读取 API 响应
Reading API response using Twisted
我在使用 python Twisted 网络客户端调用 API 后尝试读取响应。我已经对传入 json 结构的端点进行了 POST 调用,然后它应该 return 带有消息(如果失败)或 json 结构的状态,如果成功。
使用下面的代码,我可以看到正在调用消息以及状态代码,但我没有看到 message/json 结构。
'BeginningPrinter' 从来没有被调用过,我不明白为什么。
输出示例:
$ python sample.py
Response version: (b'HTTP', 1, 0)
Response code: 401 | phrase : b'UNAUTHORIZED'
Response headers:
Response length: 28
抱歉,代码太长了,但我想确保它包含我在其中 运行 使用的所有内容。
from io import BytesIO
import json
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import FileBodyProducer
agent = Agent(reactor)
class BeginningPrinter(Protocol):
def __init__(self, finished):
self.finished = finished
self.remaining = 1024 * 10
print('begin')
def dataReceived(self, bytes):
print('bytes')
if self.remaining:
display = bytes[:self.remaining]
print('Some data received:')
print(display)
self.remaining -= len(display)
def connectionLost(self, reason):
print('Finished receiving body:', reason.getErrorMessage())
self.finished.callback(None)
TESTDATA = { "keySequence": "2019-07-14" }
jsonData = json.dumps(TESTDATA)
body = BytesIO(jsonData.encode('utf-8'))
body = FileBodyProducer(body)
headerDict = \
{
'User-Agent': ['test'],
'Content-Type': ['application/json'],
'APIGUID' : ['ForTesting']
}
header = Headers(headerDict)
d = agent.request(b'POST', b' http://127.0.0.1:5000/receiveKeyCode', header, body)
def cbRequest(response):
print(f'Response version: {response.version}')
print(f'Response code: {response.code} | phrase : {response.phrase}')
print('Response headers:')
print('Response length:', response.length)
print(pformat(list(response.headers.getAllRawHeaders())))
print(response.deliverBody)
finished = Deferred()
response.deliverBody(BeginningPrinter(finished))
return finished
d.addCallback(cbRequest)
def cbShutdown(ignored):
#reactor.stop()
pass
d.addBoth(cbShutdown)
reactor.run()
你不需要所有这些无意义的代码,如果你已经在使用 Flask 那么你可以写入 API 并在几行中取回值,如果你不是那么它使pip install 是有意义的,因为它让生活变得更轻松。
import json
import requests
headers = {
'content-type': 'application/json',
'APIGUID' : 'ForTesting'
}
conv = {"keySequence": "2019-07-14"}
s = json.dumps(conv)
res = requests.post("http://127.0.0.1:5000/receiveKeyCode",data=s, headers=headers)
print(res.text)
参考:见this Whosebug link
我在使用 python Twisted 网络客户端调用 API 后尝试读取响应。我已经对传入 json 结构的端点进行了 POST 调用,然后它应该 return 带有消息(如果失败)或 json 结构的状态,如果成功。
使用下面的代码,我可以看到正在调用消息以及状态代码,但我没有看到 message/json 结构。
'BeginningPrinter' 从来没有被调用过,我不明白为什么。
输出示例:
$ python sample.py
Response version: (b'HTTP', 1, 0)
Response code: 401 | phrase : b'UNAUTHORIZED'
Response headers:
Response length: 28
抱歉,代码太长了,但我想确保它包含我在其中 运行 使用的所有内容。
from io import BytesIO
import json
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import FileBodyProducer
agent = Agent(reactor)
class BeginningPrinter(Protocol):
def __init__(self, finished):
self.finished = finished
self.remaining = 1024 * 10
print('begin')
def dataReceived(self, bytes):
print('bytes')
if self.remaining:
display = bytes[:self.remaining]
print('Some data received:')
print(display)
self.remaining -= len(display)
def connectionLost(self, reason):
print('Finished receiving body:', reason.getErrorMessage())
self.finished.callback(None)
TESTDATA = { "keySequence": "2019-07-14" }
jsonData = json.dumps(TESTDATA)
body = BytesIO(jsonData.encode('utf-8'))
body = FileBodyProducer(body)
headerDict = \
{
'User-Agent': ['test'],
'Content-Type': ['application/json'],
'APIGUID' : ['ForTesting']
}
header = Headers(headerDict)
d = agent.request(b'POST', b' http://127.0.0.1:5000/receiveKeyCode', header, body)
def cbRequest(response):
print(f'Response version: {response.version}')
print(f'Response code: {response.code} | phrase : {response.phrase}')
print('Response headers:')
print('Response length:', response.length)
print(pformat(list(response.headers.getAllRawHeaders())))
print(response.deliverBody)
finished = Deferred()
response.deliverBody(BeginningPrinter(finished))
return finished
d.addCallback(cbRequest)
def cbShutdown(ignored):
#reactor.stop()
pass
d.addBoth(cbShutdown)
reactor.run()
你不需要所有这些无意义的代码,如果你已经在使用 Flask 那么你可以写入 API 并在几行中取回值,如果你不是那么它使pip install 是有意义的,因为它让生活变得更轻松。
import json
import requests
headers = {
'content-type': 'application/json',
'APIGUID' : 'ForTesting'
}
conv = {"keySequence": "2019-07-14"}
s = json.dumps(conv)
res = requests.post("http://127.0.0.1:5000/receiveKeyCode",data=s, headers=headers)
print(res.text)
参考:见this Whosebug link