tornado 中 Flask API 的替代品(Dialogflow webhook)
Alternatives of Flask APIs in tornado (Dialogflow webhook)
我需要在 Tornado 服务器中为 Dialogflow 创建一个 webhook。我以前用烧瓶做过这个。我是他们两个的初学者。烧瓶的代码
Flask 网络钩子代码:
from flask import render_template
import os
from flask import Flask
from flask import request
from flask import make_response
import json
import time
app = Flask(__name__)
@app.route("/")
def index():
return render_template("index.html")
@app.route('/webhook', methods=['POST', 'GET'])
def webhook():
req = request.get_json(silent=True, force=True)
print(json.dumps(req, indent=4))
res = makeWebhookResult(req)
res = json.dumps(res, indent=4)
print("+++++++++++++++RESPONSE+++++++++++++++++", res)
r = make_response(res)
r.headers['Content-Type'] = 'application/json'
return r
# Right now I'm just printing a response to see if it works properly
def makeWebhookResult(req):
queryResult = req.get('queryResult').get("queryText")
speech = queryResult
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
#./ngrok http 8090
if __name__ == '__main__':
port = int(os.getenv('PORT', 8090))
print("Starting app on port %d" % (port))
app.run(debug=True, port=port, host='localhost')
现在我在龙卷风中这样试过这个:
import tornado.ioloop
import tornado.web as web
import tornado
import json
import os
static_root = os.path.join(os.path.dirname(__file__), 'static')
class MainHandler(tornado.web.RequestHandler):
def get(self):
template = "./templates/index.html"
self.render(template)
class Webhook(tornado.web.RequestHandler):
def prepare(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
self.req = json.loads(self.request.body)
print(self.req)
print(json.dumps(self.req, indent=4))
response = self.webhook_result(self.req)
response = json.dumps(response, indent=4)
response.headers['Content-Type'] = 'application/json'
return response
def webhook_result(self, *args):
queryResult = self.req.get('queryResult').get("queryText")
speech = queryResult
print(speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
handlers = [
(r'/', MainHandler),
(r'(.*)', web.StaticFileHandler, {'path': static_root}),
(r'/webhook', Webhook,)
]
settings = dict(
debug=True,
static_path=static_root
)
application = web.Application(handlers, **settings)
if __name__ == "__main__":
port = 8090
application.listen(port)
print(f"Server running on port : {port}")
tornado.ioloop.IOLoop.instance().start()
它与 Flask 配合得很好。
当我尝试 运行 通过龙卷风(使用 ngrok 进行隧道传输)时,我得到一个 WARNING:tornado.access:405 POST /webhook (127.0.0.1) 0.83ms
我阅读了 tornado 的文档,但我似乎仍然无法弄清楚我该怎么做。我假设问题出在 Webhook class 上。我在这里做错了什么?
警告显示 POST
请求存在问题 - 要处理 POST
,您需要 class Webhook
.
中的方法 def post()
它应该是 post()
而不是 prepare()
(这是不同的东西)。
您可以使用 self.write(dictionary)
将其发送为 'application/json'
class Webhook(tornado.web.RequestHandler):
def post(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
data_input = json.loads(self.request.body)
print('data_input:', data_input)
print('data_input json.dumps:', json.dumps(data_input, indent=4))
data_output = self.webhook_result(data_input) # get as normal dict, not string
print('data_output:', data_output)
print('data_output json.dumps:', json.dumps(data_output, indent=4))
self.write(data_output) # it will send as JSON
else:
self.write({'error': 'Wrong Content-Type'}) # it will send as JSON
顺便说一句:如果你将值发送到 webhook_result()
那么你可以获得这个值 - 即作为 data
- 并使用它而不是 self.req
def webhook_result(self, data):
speech = data.get('queryResult').get("queryText")
print('speech:', speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
我测试过的代码
import tornado
import tornado.web
import json
import os
static_root = os.path.join(os.path.dirname('.'), 'static')
class MainHandler(tornado.web.RequestHandler):
def get(self):
#self.render("./templates/index.html")
# to test POST request but with wrong Content-Type
self.write('''<form action="/webhook" method="POST"><button>SUBMIT</button></form>''')
class Webhook(tornado.web.RequestHandler):
def post(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
data_input = json.loads(self.request.body)
print('data_input:', data_input)
print('data_input json.dumps:', json.dumps(data_input, indent=4))
data_output = self.webhook_result(data_input) # get as normal dict, not string
print('data_output:', data_output)
print('data_output json.dumps:', json.dumps(data_output, indent=4))
self.write(data_output) # it will send as JSON
else:
self.write({'error': 'Wrong Content-Type'}) # it will send as JSON
def webhook_result(self, data):
speech = data.get('queryResult').get("queryText")
print('speech:', speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
handlers = [
(r'/', MainHandler),
(r'/webhook', Webhook),
# probably it should be as last
#(r'(.*)', web.StaticFileHandler, {'path': static_root}),
]
settings = dict(
debug=True,
static_path=static_root
)
application = tornado.web.Application(handlers, **settings)
if __name__ == "__main__":
port = 8090
application.listen(port)
print(f"Running: http://127.0.0.1:{port}")
tornado.ioloop.IOLoop.instance().start()
我用 JSON 数据发送 POST
请求的代码:
import requests
url = 'http://127.0.0.1:8090/webhook'
data = {'queryResult': {'queryText': 'Hello World'}}
r = requests.post(url, json=data)
print(r.status_code)
print(r.headers.get('Content-Type'))
print(r.json())
顺便说一句: 在 Flask 中你可以做到
@app.route('/webhook', methods=['POST', 'GET'])
def webhook():
data_input = request.get_json(silent=True, force=True)
data_output = makeWebhookResult(data_input)
return jsonify(data_output)
我需要在 Tornado 服务器中为 Dialogflow 创建一个 webhook。我以前用烧瓶做过这个。我是他们两个的初学者。烧瓶的代码 Flask 网络钩子代码:
from flask import render_template
import os
from flask import Flask
from flask import request
from flask import make_response
import json
import time
app = Flask(__name__)
@app.route("/")
def index():
return render_template("index.html")
@app.route('/webhook', methods=['POST', 'GET'])
def webhook():
req = request.get_json(silent=True, force=True)
print(json.dumps(req, indent=4))
res = makeWebhookResult(req)
res = json.dumps(res, indent=4)
print("+++++++++++++++RESPONSE+++++++++++++++++", res)
r = make_response(res)
r.headers['Content-Type'] = 'application/json'
return r
# Right now I'm just printing a response to see if it works properly
def makeWebhookResult(req):
queryResult = req.get('queryResult').get("queryText")
speech = queryResult
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
#./ngrok http 8090
if __name__ == '__main__':
port = int(os.getenv('PORT', 8090))
print("Starting app on port %d" % (port))
app.run(debug=True, port=port, host='localhost')
现在我在龙卷风中这样试过这个:
import tornado.ioloop
import tornado.web as web
import tornado
import json
import os
static_root = os.path.join(os.path.dirname(__file__), 'static')
class MainHandler(tornado.web.RequestHandler):
def get(self):
template = "./templates/index.html"
self.render(template)
class Webhook(tornado.web.RequestHandler):
def prepare(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
self.req = json.loads(self.request.body)
print(self.req)
print(json.dumps(self.req, indent=4))
response = self.webhook_result(self.req)
response = json.dumps(response, indent=4)
response.headers['Content-Type'] = 'application/json'
return response
def webhook_result(self, *args):
queryResult = self.req.get('queryResult').get("queryText")
speech = queryResult
print(speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
handlers = [
(r'/', MainHandler),
(r'(.*)', web.StaticFileHandler, {'path': static_root}),
(r'/webhook', Webhook,)
]
settings = dict(
debug=True,
static_path=static_root
)
application = web.Application(handlers, **settings)
if __name__ == "__main__":
port = 8090
application.listen(port)
print(f"Server running on port : {port}")
tornado.ioloop.IOLoop.instance().start()
它与 Flask 配合得很好。 当我尝试 运行 通过龙卷风(使用 ngrok 进行隧道传输)时,我得到一个 WARNING:tornado.access:405 POST /webhook (127.0.0.1) 0.83ms
我阅读了 tornado 的文档,但我似乎仍然无法弄清楚我该怎么做。我假设问题出在 Webhook class 上。我在这里做错了什么?
警告显示 POST
请求存在问题 - 要处理 POST
,您需要 class Webhook
.
def post()
它应该是 post()
而不是 prepare()
(这是不同的东西)。
您可以使用 self.write(dictionary)
将其发送为 'application/json'
class Webhook(tornado.web.RequestHandler):
def post(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
data_input = json.loads(self.request.body)
print('data_input:', data_input)
print('data_input json.dumps:', json.dumps(data_input, indent=4))
data_output = self.webhook_result(data_input) # get as normal dict, not string
print('data_output:', data_output)
print('data_output json.dumps:', json.dumps(data_output, indent=4))
self.write(data_output) # it will send as JSON
else:
self.write({'error': 'Wrong Content-Type'}) # it will send as JSON
顺便说一句:如果你将值发送到 webhook_result()
那么你可以获得这个值 - 即作为 data
- 并使用它而不是 self.req
def webhook_result(self, data):
speech = data.get('queryResult').get("queryText")
print('speech:', speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
我测试过的代码
import tornado
import tornado.web
import json
import os
static_root = os.path.join(os.path.dirname('.'), 'static')
class MainHandler(tornado.web.RequestHandler):
def get(self):
#self.render("./templates/index.html")
# to test POST request but with wrong Content-Type
self.write('''<form action="/webhook" method="POST"><button>SUBMIT</button></form>''')
class Webhook(tornado.web.RequestHandler):
def post(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
data_input = json.loads(self.request.body)
print('data_input:', data_input)
print('data_input json.dumps:', json.dumps(data_input, indent=4))
data_output = self.webhook_result(data_input) # get as normal dict, not string
print('data_output:', data_output)
print('data_output json.dumps:', json.dumps(data_output, indent=4))
self.write(data_output) # it will send as JSON
else:
self.write({'error': 'Wrong Content-Type'}) # it will send as JSON
def webhook_result(self, data):
speech = data.get('queryResult').get("queryText")
print('speech:', speech)
return {
"fulfillmentText": 'YOLO',
"source": 'App'
}
handlers = [
(r'/', MainHandler),
(r'/webhook', Webhook),
# probably it should be as last
#(r'(.*)', web.StaticFileHandler, {'path': static_root}),
]
settings = dict(
debug=True,
static_path=static_root
)
application = tornado.web.Application(handlers, **settings)
if __name__ == "__main__":
port = 8090
application.listen(port)
print(f"Running: http://127.0.0.1:{port}")
tornado.ioloop.IOLoop.instance().start()
我用 JSON 数据发送 POST
请求的代码:
import requests
url = 'http://127.0.0.1:8090/webhook'
data = {'queryResult': {'queryText': 'Hello World'}}
r = requests.post(url, json=data)
print(r.status_code)
print(r.headers.get('Content-Type'))
print(r.json())
顺便说一句: 在 Flask 中你可以做到
@app.route('/webhook', methods=['POST', 'GET'])
def webhook():
data_input = request.get_json(silent=True, force=True)
data_output = makeWebhookResult(data_input)
return jsonify(data_output)