Flask Socket-IO 服务器客户端不通信(使用 tweepy 和 twilio)
Flask Socket-IO Server Client Not Communicating (with tweepy and twilio)
我正在尝试拥有一个允许我启动 tweepy 流的 Flask 服务器,并且在流侦听器中收到的每条消息上,它都会将该消息发送到 socketio 客户端。 Flask 服务器同时应该允许 Twilio post 到它,并将该消息路由到客户端——以便客户端从 Twilio 和 twitter 接收消息。
我一直在尝试让服务器将消息发送到客户端以获取来自 Twitter 的数据,Twilio 的代码工作正常。它在收到消息时将数据发送到客户端。 tweepy 中的主循环也没有锁定程序——我可以测试打印语句并查看推文和收到的短信在 handle_message(msg)
函数中异步打印。我觉得一定有一些非常简单的东西我在这里遗漏了,因为 SMS 被发送到客户端,但传入的推文却没有,即使它们正在传播到 handle_message(msg)
函数。给出了什么?
server.py
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/path/to/credentials/'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, cors_allowed_origins="*")
auth = tweepy.OAuthHandler(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'])
auth.set_access_token(credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
api = tweepy.API(auth)
class MyListener(tweepy.StreamListener):
def on_status(self, status):
print('status')
def on_data(self, data):
handle_message(data)
def on_error(self, status):
print('error')
print(status)
stream_listener = MyListener()
# twilio sms route
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
# flask-socketio stuff
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#mysearchterm"
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=[search_term], is_async=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client Diconnected')
@sio.event
def handle_message(message):
print("This is the message received: ", message)
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)
client.py
import socketio
client = socketio.Client()
@client.on('client_connected')
def on_connect(message):
print(message)
@client.on('handle_message')
def message(data):
print(data)
client.connect('http://localhost:5000/')
这里是 Twilio 开发人员布道者。
您已将 handle_message
函数装饰为 @sio.event
,但是 as far as I can see in the docs,您应该这样做只是为了让 handle_message
方法响应套接字上调用的事件“handle_message”。
我将开始删除 @sio.event
装饰器。
我不是Python专家,但我也想知道这里是否存在范围问题。您定义 MyListener
class 并在定义 handle_message
方法之前创建它的一个实例。只是为了测试,您可以尝试直接在 on_data
方法中发送到套接字:
def on_data(self, data):
sio.emit('handle_message', data)
如果可行,请考虑将 handle_message
的定义移动到 MyListener
的定义之上。
我的问题解决了!正如我在此 中指出的那样,问题在于多线程和线程之间的信息传递。使用 tweepy,参数 is_async=True
,在 4.1.0 中是 threading=True
,一旦流是 运行.
,就会打开一个新线程
我没有尝试处理四处传递的信息,而是通过 using a local redis server as a message queue (start from the section "Using Multiple Workers" if you are setting this up for the first time, also be sure to install redis) 利用现存的 flask-socketio 功能。
这是更新后的 server.py
代码。 client.py
代码基本保持不变:
import eventlet
eventlet.monkey_patch()
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/PATH/TO/CREDENTIALS'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, message_queue='redis://', cors_allowed_origins="*")
class MyStream(tweepy.Stream):
def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
super(MyStream, self).__init__(consumer_key, consumer_secret, access_token, access_secret)
self.stream_sio = SocketIO(message_queue='redis://')
def on_status(self, status):
print('status')
def on_data(self, data):
json_data = json.loads(data)
self.stream_sio.emit('handle_message', json_data['text'])
# TODO: Send along all necessary information
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#testingtesting123"
stream = MyStream(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'],
credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
stream.filter(track=[search_term], threaded=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client disconnected')
def handle_message(message):
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)
我正在尝试拥有一个允许我启动 tweepy 流的 Flask 服务器,并且在流侦听器中收到的每条消息上,它都会将该消息发送到 socketio 客户端。 Flask 服务器同时应该允许 Twilio post 到它,并将该消息路由到客户端——以便客户端从 Twilio 和 twitter 接收消息。
我一直在尝试让服务器将消息发送到客户端以获取来自 Twitter 的数据,Twilio 的代码工作正常。它在收到消息时将数据发送到客户端。 tweepy 中的主循环也没有锁定程序——我可以测试打印语句并查看推文和收到的短信在 handle_message(msg)
函数中异步打印。我觉得一定有一些非常简单的东西我在这里遗漏了,因为 SMS 被发送到客户端,但传入的推文却没有,即使它们正在传播到 handle_message(msg)
函数。给出了什么?
server.py
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/path/to/credentials/'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, cors_allowed_origins="*")
auth = tweepy.OAuthHandler(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'])
auth.set_access_token(credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
api = tweepy.API(auth)
class MyListener(tweepy.StreamListener):
def on_status(self, status):
print('status')
def on_data(self, data):
handle_message(data)
def on_error(self, status):
print('error')
print(status)
stream_listener = MyListener()
# twilio sms route
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
# flask-socketio stuff
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#mysearchterm"
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=[search_term], is_async=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client Diconnected')
@sio.event
def handle_message(message):
print("This is the message received: ", message)
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)
client.py
import socketio
client = socketio.Client()
@client.on('client_connected')
def on_connect(message):
print(message)
@client.on('handle_message')
def message(data):
print(data)
client.connect('http://localhost:5000/')
这里是 Twilio 开发人员布道者。
您已将 handle_message
函数装饰为 @sio.event
,但是 as far as I can see in the docs,您应该这样做只是为了让 handle_message
方法响应套接字上调用的事件“handle_message”。
我将开始删除 @sio.event
装饰器。
我不是Python专家,但我也想知道这里是否存在范围问题。您定义 MyListener
class 并在定义 handle_message
方法之前创建它的一个实例。只是为了测试,您可以尝试直接在 on_data
方法中发送到套接字:
def on_data(self, data):
sio.emit('handle_message', data)
如果可行,请考虑将 handle_message
的定义移动到 MyListener
的定义之上。
我的问题解决了!正如我在此 is_async=True
,在 4.1.0 中是 threading=True
,一旦流是 运行.
我没有尝试处理四处传递的信息,而是通过 using a local redis server as a message queue (start from the section "Using Multiple Workers" if you are setting this up for the first time, also be sure to install redis) 利用现存的 flask-socketio 功能。
这是更新后的 server.py
代码。 client.py
代码基本保持不变:
import eventlet
eventlet.monkey_patch()
from flask import Flask, json, request
from twilio.twiml.messaging_response import Message, MessagingResponse
from flask_socketio import SocketIO
import tweepy
import json
PATH = '/PATH/TO/CREDENTIALS'
with open(PATH, "r") as file:
credentials = json.load(file)
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'abc123'
sio = SocketIO(app, message_queue='redis://', cors_allowed_origins="*")
class MyStream(tweepy.Stream):
def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
super(MyStream, self).__init__(consumer_key, consumer_secret, access_token, access_secret)
self.stream_sio = SocketIO(message_queue='redis://')
def on_status(self, status):
print('status')
def on_data(self, data):
json_data = json.loads(data)
self.stream_sio.emit('handle_message', json_data['text'])
# TODO: Send along all necessary information
@app.route('/sms', methods=['POST'])
def sms():
number = request.form['From']
message_body = request.form['Body']
message_data = {"number": number, "msg": message_body}
resp = MessagingResponse()
resp.message('Hello {}, you said: {}'.format(number, message_body))
handle_message(message_data)
return str(resp)
@sio.on('connect')
def connect():
print('connected')
sio.emit('client_connected', "you connected")
search_term = "#testingtesting123"
stream = MyStream(credentials['CONSUMER_KEY'], credentials['CONSUMER_SECRET'],
credentials['ACCESS_TOKEN'], credentials['ACCESS_SECRET'])
stream.filter(track=[search_term], threaded=True)
sio.emit('client_connected', "the search term is {}".format(search_term))
@sio.on('disconnect')
def disconnect():
print('Client disconnected')
def handle_message(message):
sio.emit('handle_message', message)
if __name__ == '__main__':
sio.run(app)