Twisted SSE 服务器通过 pubsub 订阅 Redis

Twisted SSE server subscribed to Redis via pubsub

我正在尝试在 Twisted 中构建一个服务器,它可以让客户端使用服务器发送的事件进行连接。我希望此服务器也能监听 Redis,如果有消息,则将其推送到连接的 SSE 客户端。

我的 SSE 服务器正在运行。我知道如何订阅 Redis。我不知道如何让两件作品 运行 不互相阻塞。

我知道 https://github.com/leporo/tornado-redis and https://github.com/fiorix/txredisapi,相关问题中推荐了它。不知道这有什么帮助:/

如何解决这个问题?您能否在以下两个方面提供帮助:概念提示和代码片段?

我的 Twisted SSE 服务器代码:

# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor


class Subscribe(resource.Resource):
    isLeaf = True
    sse_conns = set()

    def render_GET(self, request):
        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
        request.write("")
        self.add_conn(request)
        return server.NOT_DONE_YET

    def add_conn(self, conn):
        self.sse_conns.add(conn)
        finished = conn.notifyFinish()
        finished.addBoth(self.rm_conn)

    def rm_conn(self, conn):
        self.sse_conns.remove(conn)

    def broadcast(self, event):
        for conn in self.sse_conns:
            event_line = "data: {}'\r\n'".format(event)
            conn.write(event_line + '\r\n')


if __name__ == "__main__":
    sub = Subscribe()
    reactor.listenTCP(9000, server.Site(sub))
    reactor.run()

我的Redis订阅码:

import redis


redis = redis.StrictRedis.from_url('redis://localhost:6379')


class RedisSub(object):
    def __init__(self):
        self.pubsub = redis.pubsub()
        self.pubsub.subscribe('foobar-channel')

    def listen(self):
        for item in self.pubsub.listen():
            print str(item)

这对我有用。

我最终使用了 txredis 库,对 RedisClient 稍作改动(添加了最少的订阅功能)。

# coding: utf-8
import os
import sys
import weakref

from txredis.client import RedisClient

from twisted.web import server, resource
from twisted.internet import reactor, protocol, defer
from twisted.python import log

from utils import cors, redis_conf_from_url


log.startLogging(sys.stdout)

PORT = int(os.environ.get('PORT', 9000))
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
REDIS_SUB_CHANNEL = 'votes'


class RedisBroadcaster(RedisClient):
    def subscribe(self, *channels):
        self._send('SUBSCRIBE', *channels)

    def handleCompleteMultiBulkData(self, reply):
        if reply[0] == u"message":
            message = reply[1:][1]
            self.sse_connector.broadcast(message)
        else:
            super(RedisClient, self).handleCompleteMultiBulkData(reply)


@defer.inlineCallbacks
def redis_sub():
    clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
    redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
    redis.subscribe(REDIS_SUB_CHANNEL)


class Subscribe(resource.Resource):
    isLeaf = True
    sse_conns = weakref.WeakSet()

    @cors
    def render_GET(self, request):
        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
        request.write("")
        self.sse_conns.add(request)
        return server.NOT_DONE_YET

    def broadcast(self, event):
        for conn in self.sse_conns:
            event_line = "data: {}\r\n".format(event)
            conn.write(event_line + '\r\n')


if __name__ == "__main__":
    sub = Subscribe()
    reactor.listenTCP(PORT, server.Site(sub))

    RedisBroadcaster.sse_connector = sub    
    reactor.callLater(0, redis_sub)

    reactor.run()