Twisted SSE 服务器通过 pubsub 订阅 Redis
Twisted SSE server subscribed to Redis via pubsub
我正在尝试在 Twisted 中构建一个服务器,它可以让客户端使用服务器发送的事件进行连接。我希望此服务器也能监听 Redis,如果有消息,则将其推送到连接的 SSE 客户端。
我的 SSE 服务器正在运行。我知道如何订阅 Redis。我不知道如何让两件作品 运行 不互相阻塞。
我知道 https://github.com/leporo/tornado-redis and https://github.com/fiorix/txredisapi,相关问题中推荐了它。不知道这有什么帮助:/
如何解决这个问题?您能否在以下两个方面提供帮助:概念提示和代码片段?
我的 Twisted SSE 服务器代码:
# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor
class Subscribe(resource.Resource):
isLeaf = True
sse_conns = set()
def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.add_conn(request)
return server.NOT_DONE_YET
def add_conn(self, conn):
self.sse_conns.add(conn)
finished = conn.notifyFinish()
finished.addBoth(self.rm_conn)
def rm_conn(self, conn):
self.sse_conns.remove(conn)
def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}'\r\n'".format(event)
conn.write(event_line + '\r\n')
if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(9000, server.Site(sub))
reactor.run()
我的Redis订阅码:
import redis
redis = redis.StrictRedis.from_url('redis://localhost:6379')
class RedisSub(object):
def __init__(self):
self.pubsub = redis.pubsub()
self.pubsub.subscribe('foobar-channel')
def listen(self):
for item in self.pubsub.listen():
print str(item)
这对我有用。
我最终使用了 txredis 库,对 RedisClient 稍作改动(添加了最少的订阅功能)。
# coding: utf-8
import os
import sys
import weakref
from txredis.client import RedisClient
from twisted.web import server, resource
from twisted.internet import reactor, protocol, defer
from twisted.python import log
from utils import cors, redis_conf_from_url
log.startLogging(sys.stdout)
PORT = int(os.environ.get('PORT', 9000))
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
REDIS_SUB_CHANNEL = 'votes'
class RedisBroadcaster(RedisClient):
def subscribe(self, *channels):
self._send('SUBSCRIBE', *channels)
def handleCompleteMultiBulkData(self, reply):
if reply[0] == u"message":
message = reply[1:][1]
self.sse_connector.broadcast(message)
else:
super(RedisClient, self).handleCompleteMultiBulkData(reply)
@defer.inlineCallbacks
def redis_sub():
clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
redis.subscribe(REDIS_SUB_CHANNEL)
class Subscribe(resource.Resource):
isLeaf = True
sse_conns = weakref.WeakSet()
@cors
def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.sse_conns.add(request)
return server.NOT_DONE_YET
def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}\r\n".format(event)
conn.write(event_line + '\r\n')
if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(PORT, server.Site(sub))
RedisBroadcaster.sse_connector = sub
reactor.callLater(0, redis_sub)
reactor.run()
我正在尝试在 Twisted 中构建一个服务器,它可以让客户端使用服务器发送的事件进行连接。我希望此服务器也能监听 Redis,如果有消息,则将其推送到连接的 SSE 客户端。
我的 SSE 服务器正在运行。我知道如何订阅 Redis。我不知道如何让两件作品 运行 不互相阻塞。
我知道 https://github.com/leporo/tornado-redis and https://github.com/fiorix/txredisapi,相关问题中推荐了它。不知道这有什么帮助:/
如何解决这个问题?您能否在以下两个方面提供帮助:概念提示和代码片段?
我的 Twisted SSE 服务器代码:
# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor
class Subscribe(resource.Resource):
isLeaf = True
sse_conns = set()
def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.add_conn(request)
return server.NOT_DONE_YET
def add_conn(self, conn):
self.sse_conns.add(conn)
finished = conn.notifyFinish()
finished.addBoth(self.rm_conn)
def rm_conn(self, conn):
self.sse_conns.remove(conn)
def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}'\r\n'".format(event)
conn.write(event_line + '\r\n')
if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(9000, server.Site(sub))
reactor.run()
我的Redis订阅码:
import redis
redis = redis.StrictRedis.from_url('redis://localhost:6379')
class RedisSub(object):
def __init__(self):
self.pubsub = redis.pubsub()
self.pubsub.subscribe('foobar-channel')
def listen(self):
for item in self.pubsub.listen():
print str(item)
这对我有用。
我最终使用了 txredis 库,对 RedisClient 稍作改动(添加了最少的订阅功能)。
# coding: utf-8
import os
import sys
import weakref
from txredis.client import RedisClient
from twisted.web import server, resource
from twisted.internet import reactor, protocol, defer
from twisted.python import log
from utils import cors, redis_conf_from_url
log.startLogging(sys.stdout)
PORT = int(os.environ.get('PORT', 9000))
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
REDIS_SUB_CHANNEL = 'votes'
class RedisBroadcaster(RedisClient):
def subscribe(self, *channels):
self._send('SUBSCRIBE', *channels)
def handleCompleteMultiBulkData(self, reply):
if reply[0] == u"message":
message = reply[1:][1]
self.sse_connector.broadcast(message)
else:
super(RedisClient, self).handleCompleteMultiBulkData(reply)
@defer.inlineCallbacks
def redis_sub():
clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
redis.subscribe(REDIS_SUB_CHANNEL)
class Subscribe(resource.Resource):
isLeaf = True
sse_conns = weakref.WeakSet()
@cors
def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.sse_conns.add(request)
return server.NOT_DONE_YET
def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}\r\n".format(event)
conn.write(event_line + '\r\n')
if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(PORT, server.Site(sub))
RedisBroadcaster.sse_connector = sub
reactor.callLater(0, redis_sub)
reactor.run()