如何使用 GCP 平台流式传输事件?
How to stream events with GCP platform?
我正在研究构建一个简单的解决方案,其中生产者服务将事件推送到消息队列,然后让流服务通过 gRPC 流使这些可用 API。
Cloud Pub/Sub 似乎非常适合这项工作,但扩展流媒体服务意味着该服务的每个副本都需要创建自己的订阅并在缩小规模之前将其删除,这似乎不必要地复杂而且不是什么该平台旨在用于。
另一方面,Kafka 似乎很适合这样的事情,但我想避免自己管理底层平台,而是利用云基础设施。
我还应该提到,进行流式传输的原因 API 是为了允许流式传输到前端(可能无法访问底层基础设施)
有没有更好的方法来使用 GCP 平台执行类似的操作,而无需部署和管理我自己的基础架构?
如果您本质上想要临时订阅,那么您可以在创建订阅时在 Subscription 对象上设置一些内容:
将 expiration_policy
设置为更小的持续时间。当订阅者在该时间段内未收到消息时,订阅将被删除。权衡是,如果您的订阅者由于持续时间超过此时间段的暂时性问题而关闭,则订阅将被删除。默认情况下,有效期为 31 天。您可以将其设置为低至 1 天。对于拉式订阅者,订阅者只需停止向 Cloud Pub/Sub 发出请求,以便计时器在到期时启动。对于推送订阅,计时器根据没有消息成功传递到端点的时间启动。因此,如果没有发布任何消息,或者端点为所有推送的消息返回错误,则计时器生效。
减少message_retention_duration
的值。这是在订户未接收消息并确认消息的情况下保留消息的时间段。默认情况下,这是 7 天。您可以将其设置为低至 10 分钟。权衡是,如果您的订阅者断开连接或处理消息的时间超过此持续时间,则早于该时间的消息将被删除并且订阅者将看不到它们。
干净关闭的订阅者可能只需要自己调用DeleteSubscription,这样订阅就会立即消失,但是对于意外关闭的订阅者,设置这两个属性将最大限度地减少订阅继续存在的时间和数量将保留的消息(永远不会传递)的数量。
请记住,Cloud Pub/Sub quotas每个主题和每个项目的订阅数量限制为 1 到 10,000 个。因此,如果创建了大量订阅并且处于活动状态或未清理(手动或在 expiration_policy
的 ttl
过后自动清除),则可能无法创建新订阅。
我认为您最初的想法比临时订阅要好。我的意思是它有效,但感觉完全不自然。取决于您的要求。例如,客户端是只需要在连接时接收消息,还是都需要获取所有消息?
仅在连接时
我觉得你最初的想法更好。我可能会做的是创建一个客户端可以连接到的 gRPC 流服务。该实现本质上是一种观察者模式。消费者将收到一条消息,然后遍历订阅者以向所有订阅者执行“发送”。从那里开始,任何时候客户端连接到服务时,它都会在该观察者集合中注册自己,并在断开连接时取消注册。水平扩展是被动的,因为客户端会粘附到他们连接的任何实例。
每个人都会收到消息,如果最终
概念与上述类似,但客户端不会在断开连接时隐式地从观察者处取消注册。相反,它会显式注册和取消注册(通过 method/command 设计来这样做)。修改'on disconnected'逻辑,告诉观察者列表客户端下线了。然后消费者的广播逻辑略有不同。现在它遍历列表并说“如果在线,则发送,否则排队”,并将消息发送到临时队列(属于客户端)。然后,您的 'on connect' 逻辑会将队列中的所有消息发送给客户端,然后再通知消费者它已恢复在线。基本上是一个收件箱。在像 RabbitMQ 这样的大多数产品中,设置临时的、自删除的队列真的很容易。我认为你必须做一些管理是否可以删除队列。例如,永远不要删除队列,除非客户端明确取消订阅或长时间处于非活动状态。如果做不到这一点,整个收件箱的想法就会分崩离析。
上面选择的答案与我在这里订阅的最相似,因为订阅是队列。如果我这样做,那么我可能会将其实现为内部总线而不是观察者(因为这是不必要的)——您根据需要为连接的客户端创建一个消费者,它实际上只是转发消息。消息消费者根据客户端是否连接来订阅和取消订阅。正如 Kamal 指出的那样,如果您的规模超过 pubsub 允许的最大订阅数,您将 运行 遇到问题。如果您发现自己处于那个位置,那么您可以通过实施上述模式来解除这种限制。它基本上是相同的模式,但你将责任转移到你的基础设施上,唯一的限制是你自己的资源。
gRPC 使这种机制变得非常简单。或者,对于 Web,如果您使用的是 Microsoft 堆栈,那么 SignalR 也可以让这变得非常简单。客户端连接到集线器,您可以发布到所有连接的客户端。这里的消费者模式基本保持不变,但您不必手动实现观察者模式。
(注:图中箭头指向的是依赖方向,不是数据流向)
我正在研究构建一个简单的解决方案,其中生产者服务将事件推送到消息队列,然后让流服务通过 gRPC 流使这些可用 API。
Cloud Pub/Sub 似乎非常适合这项工作,但扩展流媒体服务意味着该服务的每个副本都需要创建自己的订阅并在缩小规模之前将其删除,这似乎不必要地复杂而且不是什么该平台旨在用于。
另一方面,Kafka 似乎很适合这样的事情,但我想避免自己管理底层平台,而是利用云基础设施。
我还应该提到,进行流式传输的原因 API 是为了允许流式传输到前端(可能无法访问底层基础设施)
有没有更好的方法来使用 GCP 平台执行类似的操作,而无需部署和管理我自己的基础架构?
如果您本质上想要临时订阅,那么您可以在创建订阅时在 Subscription 对象上设置一些内容:
将
expiration_policy
设置为更小的持续时间。当订阅者在该时间段内未收到消息时,订阅将被删除。权衡是,如果您的订阅者由于持续时间超过此时间段的暂时性问题而关闭,则订阅将被删除。默认情况下,有效期为 31 天。您可以将其设置为低至 1 天。对于拉式订阅者,订阅者只需停止向 Cloud Pub/Sub 发出请求,以便计时器在到期时启动。对于推送订阅,计时器根据没有消息成功传递到端点的时间启动。因此,如果没有发布任何消息,或者端点为所有推送的消息返回错误,则计时器生效。减少
message_retention_duration
的值。这是在订户未接收消息并确认消息的情况下保留消息的时间段。默认情况下,这是 7 天。您可以将其设置为低至 10 分钟。权衡是,如果您的订阅者断开连接或处理消息的时间超过此持续时间,则早于该时间的消息将被删除并且订阅者将看不到它们。
干净关闭的订阅者可能只需要自己调用DeleteSubscription,这样订阅就会立即消失,但是对于意外关闭的订阅者,设置这两个属性将最大限度地减少订阅继续存在的时间和数量将保留的消息(永远不会传递)的数量。
请记住,Cloud Pub/Sub quotas每个主题和每个项目的订阅数量限制为 1 到 10,000 个。因此,如果创建了大量订阅并且处于活动状态或未清理(手动或在 expiration_policy
的 ttl
过后自动清除),则可能无法创建新订阅。
我认为您最初的想法比临时订阅要好。我的意思是它有效,但感觉完全不自然。取决于您的要求。例如,客户端是只需要在连接时接收消息,还是都需要获取所有消息?
仅在连接时
我觉得你最初的想法更好。我可能会做的是创建一个客户端可以连接到的 gRPC 流服务。该实现本质上是一种观察者模式。消费者将收到一条消息,然后遍历订阅者以向所有订阅者执行“发送”。从那里开始,任何时候客户端连接到服务时,它都会在该观察者集合中注册自己,并在断开连接时取消注册。水平扩展是被动的,因为客户端会粘附到他们连接的任何实例。
每个人都会收到消息,如果最终
概念与上述类似,但客户端不会在断开连接时隐式地从观察者处取消注册。相反,它会显式注册和取消注册(通过 method/command 设计来这样做)。修改'on disconnected'逻辑,告诉观察者列表客户端下线了。然后消费者的广播逻辑略有不同。现在它遍历列表并说“如果在线,则发送,否则排队”,并将消息发送到临时队列(属于客户端)。然后,您的 'on connect' 逻辑会将队列中的所有消息发送给客户端,然后再通知消费者它已恢复在线。基本上是一个收件箱。在像 RabbitMQ 这样的大多数产品中,设置临时的、自删除的队列真的很容易。我认为你必须做一些管理是否可以删除队列。例如,永远不要删除队列,除非客户端明确取消订阅或长时间处于非活动状态。如果做不到这一点,整个收件箱的想法就会分崩离析。
上面选择的答案与我在这里订阅的最相似,因为订阅是队列。如果我这样做,那么我可能会将其实现为内部总线而不是观察者(因为这是不必要的)——您根据需要为连接的客户端创建一个消费者,它实际上只是转发消息。消息消费者根据客户端是否连接来订阅和取消订阅。正如 Kamal 指出的那样,如果您的规模超过 pubsub 允许的最大订阅数,您将 运行 遇到问题。如果您发现自己处于那个位置,那么您可以通过实施上述模式来解除这种限制。它基本上是相同的模式,但你将责任转移到你的基础设施上,唯一的限制是你自己的资源。
gRPC 使这种机制变得非常简单。或者,对于 Web,如果您使用的是 Microsoft 堆栈,那么 SignalR 也可以让这变得非常简单。客户端连接到集线器,您可以发布到所有连接的客户端。这里的消费者模式基本保持不变,但您不必手动实现观察者模式。
(注:图中箭头指向的是依赖方向,不是数据流向)