关闭 Google Cloud PubSub 客户端,未来 运行

Closing Google Cloud PubSub client with future running

我正在使用 Cloud Pubsub,对于我们的一个系统,我开始遇到“打开的文件太多”的问题。 lsof 显示对 Google Cloud 的大量请求,我很确定这些请求是 pubsub。

谷歌搜索引导我 https://github.com/googleapis/google-cloud-python/issues/5523 这表明我需要明确关闭传输。

问题是我正在使用助手 python 包(它被大约 50-100 个其他服务调用)来发布我的消息,大致如下所示:

def pubsub_callback(future):
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)

def send_oneoff_pubsub_message(self, client=None):
    if not client:
        client = self.get_client('pubsubpub') # Creates a pubsub publisher client
    
    future = client.publish({...})
    try:
        future.exception(timeout=10)
    except Exception as exc:
        print("error")

    future.add_done_callback(pubsub_callback)

现在在许多地方,我们正在慢慢重构以在函数之外显式创建客户端(因此我们不会创建太多客户端)。但是我仍然想重构它以在消息发布后关闭客户端。

链接问题建议在您完成客户端后 client.api.transport._channel.close()。但是,在这种情况下,我只是在 pubsub_callback 被触发后才完成它。

我没有看到任何方式从未来获取客户端,回调 add_done_callback 不允许(正确)不允许发送参数。

有什么创造性的解决方案吗?

无论如何我都需要硬着头皮重构繁重的 pubsub 客户端,但这并不总是明确的。

更新:
查看代码,似乎这将在 future 之后成功关闭客户端:

def send_oneoff_pubsub_message(self, client=None):
    if not client:
        client = self.get_client('pubsubpub') # Creates a pubsub publisher client

    future = client.publish({...})
    try:
        future.exception(timeout=10)
    except Exception as exc:
        print("error")

    future.add_done_callback(pubsub_callback)
    future.result(timeout=10)
    client.api.transport._channel.close()

这种方法有什么缺点吗?除了函数块直到发布(这对我来说没问题)

您的代码混合了异步操作和同步操作。调用发布后,您最多等待 10 秒以等待未来的异常。在大多数情况下,在 10 秒内,无论如何发布都会完成,所以到那时,您不妨同步调用 future.result,甚至不必理会 add_done_callback:

def pubsub_callback(future):
  try:
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)
  except Exception as exc:
    print("error")

def send_oneoff_pubsub_message(self, client=None):
  if not client:
    client_created = True
    client = self.get_client('pubsubpub') # Creates a pubsub publisher client

  future = client.publish({...})
  pubsub_callback(future)

  if client_created:
    client.api.transport._channel.close()

如果你想异步做事,你可以使用 functools.partial:

from functools import partial

def pubsub_callback(client, future):
  try:
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)
  except Exception as exc:
    print("error")

  if client:
    client.api.transport._channel.close()

def send_oneoff_pubsub_message(self, client=None):
  if not client:
    client_created = True
    client = self.get_client('pubsubpub') # Creates a pubsub publisher client

  if client_created:
    callback = partial(pubsub_callback, client)
  else:
    callback = partial(pubsub_callback, None)

  future = client.publish({...})
  future.add_done_callback(callback)

任何一种方法都应该允许您在所需的时间点关闭客户端。