NATS / JetStream "remember" 订阅者如何?
How does NATS / JetStream "remember" subscribers?
我正在使用 NATS 迈出第一步,看到了我无法理解的行为,即使在非常仔细地阅读了文档之后也是如此。我有一个本地 NATS 服务器 (2.6.5) 运行ning。它开始于
./nats-server -js
我使用以下代码生成一些消息:
async def main():
nc = await nats.connect()
js = nc.jetstream()
await js.delete_stream(name="hello")
await js.add_stream(
name="hello",
subjects=["hello"],
)
for i in range(0, 10):
await js.publish("hello", f"hello world: {i}".encode())
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
如果我 运行 代码并执行 ./nats stream ls
我会看到 10 条消息。到目前为止一切都很好。接下来我 运行 我的消费者:
async def main():
nc = await nats.connect()
js = nc.jetstream()
sub = await js.pull_subscribe("hello", "hello")
msg_count = 0
while msg_count < 10:
for msg in await sub.fetch(1):
print("Received:", msg.data)
msg_count = msg_count + 1
# Try nack'ing every third message
if msg_count % 3 == 0:
await msg.nak()
else:
await msg.ack()
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
输出显示:
Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'
这是有道理的:我提取了 10 条消息。每三条消息都是“裸”消息,因此会在下一次调用时再次检索到。如果我再次启动脚本,输出为:
Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'
几秒钟后我暂停了。显然 NATS 以某种方式记住了我的脚本并继续传递消息。但我不明白这是怎么发生的!?流中是否有“全局”游标?但在那种情况下,多个客户会干扰,这对我来说没有意义。所以我假设 NATS 以某种方式记住了我的客户。如何?我如何告诉 NATS 我想重新启动?我也很感激指向我显然错过的文档的指针!?
创建请求订阅时,jetstream 客户端 API 还会创建一个 durable 具有匹配消费者选项的消费者,在本例中是流名称和 durable name(第二个参数)。
sub = await js.pull_subscribe("hello", "hello")
A durable consumer is intended to be long-lived and the server will
keep track of where the consumer is in the stream. So if the consumer
stops then restarts, it will automatically restart where it left off
and the configuration used to initialize the consumer will be
remembered. A durable consumer is required when making a pull
subscription and is optional when making a push subscription.
来源:https://nats.io/blog/jetstream-java-client-03-consume/#durable-vs-ephemeral
我正在使用 NATS 迈出第一步,看到了我无法理解的行为,即使在非常仔细地阅读了文档之后也是如此。我有一个本地 NATS 服务器 (2.6.5) 运行ning。它开始于
./nats-server -js
我使用以下代码生成一些消息:
async def main():
nc = await nats.connect()
js = nc.jetstream()
await js.delete_stream(name="hello")
await js.add_stream(
name="hello",
subjects=["hello"],
)
for i in range(0, 10):
await js.publish("hello", f"hello world: {i}".encode())
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
如果我 运行 代码并执行 ./nats stream ls
我会看到 10 条消息。到目前为止一切都很好。接下来我 运行 我的消费者:
async def main():
nc = await nats.connect()
js = nc.jetstream()
sub = await js.pull_subscribe("hello", "hello")
msg_count = 0
while msg_count < 10:
for msg in await sub.fetch(1):
print("Received:", msg.data)
msg_count = msg_count + 1
# Try nack'ing every third message
if msg_count % 3 == 0:
await msg.nak()
else:
await msg.ack()
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
输出显示:
Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'
这是有道理的:我提取了 10 条消息。每三条消息都是“裸”消息,因此会在下一次调用时再次检索到。如果我再次启动脚本,输出为:
Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'
几秒钟后我暂停了。显然 NATS 以某种方式记住了我的脚本并继续传递消息。但我不明白这是怎么发生的!?流中是否有“全局”游标?但在那种情况下,多个客户会干扰,这对我来说没有意义。所以我假设 NATS 以某种方式记住了我的客户。如何?我如何告诉 NATS 我想重新启动?我也很感激指向我显然错过的文档的指针!?
创建请求订阅时,jetstream 客户端 API 还会创建一个 durable 具有匹配消费者选项的消费者,在本例中是流名称和 durable name(第二个参数)。
sub = await js.pull_subscribe("hello", "hello")
A durable consumer is intended to be long-lived and the server will keep track of where the consumer is in the stream. So if the consumer stops then restarts, it will automatically restart where it left off and the configuration used to initialize the consumer will be remembered. A durable consumer is required when making a pull subscription and is optional when making a push subscription.
来源:https://nats.io/blog/jetstream-java-client-03-consume/#durable-vs-ephemeral