Redis Pubsub 和消息队列
Redis Pubsub and Message Queueing
我的总体问题是:将 Redis 用于 PubSub,当发布者将消息推送到频道的速度快于订阅者能够读取它们的速度时,消息会发生什么情况?
例如,假设我有:
- 一个简单的发布者以 2 msg/sec 的速率发布消息。
- 一个简单的订阅者以 1 的速率读取消息 msg/sec。
我天真的假设订阅者只会看到发布到 Redis 上的消息的 50%。为了验证这个理论,我写了两个脚本:
pub.py
queue = redis.StrictRedis(host='localhost', port=6379, db=0)
channel = queue.pubsub()
for i in range(10):
queue.publish("test", i)
time.sleep(0.5)
sub.py
r = redis.StrictRedis(host='localhost', port=6379, db=0)
p = r.pubsub()
p.subscribe('test')
while True:
message = p.get_message()
if message:
print "Subscriber: %s" % message['data']
time.sleep(1)
结果
- 当我先运行
sub.py
,紧接着运行pub.py
时,我发现sub.py
实际上显示了所有消息(1-10),一个接一个,延迟为中间间隔1秒。我最初的假设是错误的,Redis 正在排队消息。需要更多测试。
- 当我先运行
pub.py
,然后等待5秒再运行sub.py
时,我发现sub.py
只显示了消息的后半部分(5-10)。我最初会假设这是,但根据我以前的结果,我会认为消息已排队,这使我得出以下结论......
结论
- Redis 服务器似乎为每个客户端、每个通道排队消息。
- 只要客户端在收听,读取消息的速度有多快并不重要。只要它处于连接状态,消息就会为该客户端、该通道保持排队状态。
剩余问题
- 这些结论是否有效?
- 如果是这样,client/channel 条消息将保持排队多长时间?
- 如果是这样,是否有一个
redis-cli info
命令来查看有多少消息排队(每个 client/channel)?
测试有效,但结论部分错误。
Redis 不会在 pub/sub 通道上排队任何内容。相反,它倾向于从发布者套接字读取项目,然后将项目写入所有订阅者套接字,最好是在事件循环的同一迭代中。 Redis 数据结构中没有任何内容。
现在,正如您所展示的,仍然存在某种缓冲。这是由于使用了 TCP/IP 套接字和 Redis 通信缓冲区。
Sockets有缓冲区,当然TCP也自带一些流控机制。它避免了缓冲区已满时的数据丢失。如果订阅者的速度不够快,数据就会堆积在它的套接字缓冲区中。当它已满时,TCP 将阻止通信并阻止 Redis 在套接字中推送更多信息。
Redis 还管理输出通信缓冲区(在套接字缓冲区之上)以生成使用 Redis 协议格式化的数据。所以当套接字的输出缓冲区已满时,事件循环会将套接字标记为不可写,数据将保留在 Redis 输出缓冲区中。
只要 TCP 连接仍然有效,数据可以在缓冲区中保留很长时间。现在,套接字和 Redis 输出缓冲区都已绑定。如果订阅者实在是太慢了,堆积了很多数据,Redis最终会关闭与订阅者的连接(作为安全机制)。
默认情况下,对于 pub/sub,Redis 每个连接缓冲区的软限制为 8 MB,硬限制为 32 MB。如果输出缓冲区达到硬限制,或者保持在软硬限制之间超过 60 秒,将关闭与慢速订阅者的连接。
了解待处理消息的数量并不容易。可以通过查看套接字缓冲区和 Redis 输出缓冲区中未决信息的大小来评估它。
对于 Redis 输出缓冲区,您可以使用 CLIENT LIST command(来自 redis-cli)。输出缓冲区的大小在 obl 和 oll 字段中返回(以字节为单位)。
对于套接字缓冲区,没有 Redis 命令。但是,在 Linux 上,可以构建一个脚本来解释 /proc/net/tcp 文件的内容。请参阅示例 here。此脚本可能需要适应您的系统。
我的总体问题是:将 Redis 用于 PubSub,当发布者将消息推送到频道的速度快于订阅者能够读取它们的速度时,消息会发生什么情况?
例如,假设我有:
- 一个简单的发布者以 2 msg/sec 的速率发布消息。
- 一个简单的订阅者以 1 的速率读取消息 msg/sec。
我天真的假设订阅者只会看到发布到 Redis 上的消息的 50%。为了验证这个理论,我写了两个脚本:
pub.py
queue = redis.StrictRedis(host='localhost', port=6379, db=0)
channel = queue.pubsub()
for i in range(10):
queue.publish("test", i)
time.sleep(0.5)
sub.py
r = redis.StrictRedis(host='localhost', port=6379, db=0)
p = r.pubsub()
p.subscribe('test')
while True:
message = p.get_message()
if message:
print "Subscriber: %s" % message['data']
time.sleep(1)
结果
- 当我先运行
sub.py
,紧接着运行pub.py
时,我发现sub.py
实际上显示了所有消息(1-10),一个接一个,延迟为中间间隔1秒。我最初的假设是错误的,Redis 正在排队消息。需要更多测试。 - 当我先运行
pub.py
,然后等待5秒再运行sub.py
时,我发现sub.py
只显示了消息的后半部分(5-10)。我最初会假设这是,但根据我以前的结果,我会认为消息已排队,这使我得出以下结论......
结论
- Redis 服务器似乎为每个客户端、每个通道排队消息。
- 只要客户端在收听,读取消息的速度有多快并不重要。只要它处于连接状态,消息就会为该客户端、该通道保持排队状态。
剩余问题
- 这些结论是否有效?
- 如果是这样,client/channel 条消息将保持排队多长时间?
- 如果是这样,是否有一个
redis-cli info
命令来查看有多少消息排队(每个 client/channel)?
测试有效,但结论部分错误。
Redis 不会在 pub/sub 通道上排队任何内容。相反,它倾向于从发布者套接字读取项目,然后将项目写入所有订阅者套接字,最好是在事件循环的同一迭代中。 Redis 数据结构中没有任何内容。
现在,正如您所展示的,仍然存在某种缓冲。这是由于使用了 TCP/IP 套接字和 Redis 通信缓冲区。
Sockets有缓冲区,当然TCP也自带一些流控机制。它避免了缓冲区已满时的数据丢失。如果订阅者的速度不够快,数据就会堆积在它的套接字缓冲区中。当它已满时,TCP 将阻止通信并阻止 Redis 在套接字中推送更多信息。
Redis 还管理输出通信缓冲区(在套接字缓冲区之上)以生成使用 Redis 协议格式化的数据。所以当套接字的输出缓冲区已满时,事件循环会将套接字标记为不可写,数据将保留在 Redis 输出缓冲区中。
只要 TCP 连接仍然有效,数据可以在缓冲区中保留很长时间。现在,套接字和 Redis 输出缓冲区都已绑定。如果订阅者实在是太慢了,堆积了很多数据,Redis最终会关闭与订阅者的连接(作为安全机制)。
默认情况下,对于 pub/sub,Redis 每个连接缓冲区的软限制为 8 MB,硬限制为 32 MB。如果输出缓冲区达到硬限制,或者保持在软硬限制之间超过 60 秒,将关闭与慢速订阅者的连接。
了解待处理消息的数量并不容易。可以通过查看套接字缓冲区和 Redis 输出缓冲区中未决信息的大小来评估它。
对于 Redis 输出缓冲区,您可以使用 CLIENT LIST command(来自 redis-cli)。输出缓冲区的大小在 obl 和 oll 字段中返回(以字节为单位)。
对于套接字缓冲区,没有 Redis 命令。但是,在 Linux 上,可以构建一个脚本来解释 /proc/net/tcp 文件的内容。请参阅示例 here。此脚本可能需要适应您的系统。