使用 tweepy 使用 Twitter 流并使用 gevent 通过 websocket 提供内容

Consuming Twitter stream with tweepy and serving content via websocket with gevent

我的目标是能够使用 tweepy 发送在 Twitter 中阅读的流,并使用带有 gevent 的 websocket 提供内容。 我已经通过从官方文档中获得的两个简单脚本解决了问题的两个部分。

1) 一方面,我可以使用 tweepy 读取 Twitter 流:

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""

class myStreamListener(StreamListener):

    def on_data(self, data):
        decoded = json.loads(data)
        if decoded["coordinates"] is not None:
            print decoded["coordinates"]["coordinates"]
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':

    l = myStreamListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)

    geobox_world = [-180,-90,180,90]
    stream.filter(locations=geobox_world)

这会在标准输出中打印我需要的信息

2) 另一方面,我采用了 gevent 的转储示例,以便能够使 'thread' 能够将数据提供给 Web 客户端

from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi
import gevent
import time

def app(environ, start_response):
    ws = environ['wsgi.websocket']
    contador = 0
    while True:
        strTemp = "hola" + str(contador) 
        ws.send( strTemp )
        time.sleep(1)
        contador = contador + 1

server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
server.serve_forever()

我面临的问题是两个代码的连接;我无法在 tweepy 'thread'

中创建 运行 myStreamListener

我该怎么做?一种方法可能是制作一个中间缓冲区,但这超出了我的 pythons 技能

StreamListener 在 gevent greenlet 上运行,并将解析后的坐标发送到连接到服务器的所有 websockets。

测试 iocat localhost:10000

import gevent
import gevent.monkey
gevent.monkey.patch_all()

from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""


class MyStreamListener(StreamListener):
    def __init__(self):
        self.sockets = []
        auth = OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)
        self.stream = Stream(auth, self)

    def add_socket(self, ws):
        self.sockets.append(ws)

    def run(self):
        try:
            self.stream.filter(track="#linux")
        except Exception:
            self.stream.disconnect()

    def start(self):
        gevent.spawn(self.run)

    def send(self, ws, coordinates):
        try:
            ws.send(json.dumps(coordinates))
        except Exception:
            # the web socket die..
            self.sockets.remove(ws)

    def on_data(self, data):
        decoded = json.loads(data)
        if decoded.get("coordinates", None) is not None:
            coordinates = decoded["coordinates"]["coordinates"]
            for ws in self.sockets:
                gevent.spawn(self.send, ws, coordinates)
        return True

    def on_error(self, status):
        print "Error", status

    def on_timeout(self):
        print "tweepy timeout.. wait 30 seconds"
        gevent.sleep(30)

stream_listener = MyStreamListener()
stream_listener.start()


def app(environ, start_response):
    ws = environ['wsgi.websocket']
    stream_listener.add_socket(ws)
    while not ws.closed:
        gevent.sleep(0.1)

server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
server.serve_forever()