关闭 Google Cloud PubSub 客户端,未来 运行
Closing Google Cloud PubSub client with future running
我正在使用 Cloud Pubsub,对于我们的一个系统,我开始遇到“打开的文件太多”的问题。 lsof
显示对 Google Cloud 的大量请求,我很确定这些请求是 pubsub。
谷歌搜索引导我 https://github.com/googleapis/google-cloud-python/issues/5523 这表明我需要明确关闭传输。
问题是我正在使用助手 python 包(它被大约 50-100 个其他服务调用)来发布我的消息,大致如下所示:
def pubsub_callback(future):
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
def send_oneoff_pubsub_message(self, client=None):
if not client:
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
try:
future.exception(timeout=10)
except Exception as exc:
print("error")
future.add_done_callback(pubsub_callback)
现在在许多地方,我们正在慢慢重构以在函数之外显式创建客户端(因此我们不会创建太多客户端)。但是我仍然想重构它以在消息发布后关闭客户端。
链接问题建议在您完成客户端后 client.api.transport._channel.close()
。但是,在这种情况下,我只是在 pubsub_callback
被触发后才完成它。
我没有看到任何方式从未来获取客户端,回调 add_done_callback
不允许(正确)不允许发送参数。
有什么创造性的解决方案吗?
无论如何我都需要硬着头皮重构繁重的 pubsub 客户端,但这并不总是明确的。
更新:
查看代码,似乎这将在 future 之后成功关闭客户端:
def send_oneoff_pubsub_message(self, client=None):
if not client:
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
try:
future.exception(timeout=10)
except Exception as exc:
print("error")
future.add_done_callback(pubsub_callback)
future.result(timeout=10)
client.api.transport._channel.close()
这种方法有什么缺点吗?除了函数块直到发布(这对我来说没问题)
您的代码混合了异步操作和同步操作。调用发布后,您最多等待 10 秒以等待未来的异常。在大多数情况下,在 10 秒内,无论如何发布都会完成,所以到那时,您不妨同步调用 future.result
,甚至不必理会 add_done_callback
:
def pubsub_callback(future):
try:
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
except Exception as exc:
print("error")
def send_oneoff_pubsub_message(self, client=None):
if not client:
client_created = True
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
pubsub_callback(future)
if client_created:
client.api.transport._channel.close()
如果你想异步做事,你可以使用 functools.partial
:
from functools import partial
def pubsub_callback(client, future):
try:
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
except Exception as exc:
print("error")
if client:
client.api.transport._channel.close()
def send_oneoff_pubsub_message(self, client=None):
if not client:
client_created = True
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
if client_created:
callback = partial(pubsub_callback, client)
else:
callback = partial(pubsub_callback, None)
future = client.publish({...})
future.add_done_callback(callback)
任何一种方法都应该允许您在所需的时间点关闭客户端。
我正在使用 Cloud Pubsub,对于我们的一个系统,我开始遇到“打开的文件太多”的问题。 lsof
显示对 Google Cloud 的大量请求,我很确定这些请求是 pubsub。
谷歌搜索引导我 https://github.com/googleapis/google-cloud-python/issues/5523 这表明我需要明确关闭传输。
问题是我正在使用助手 python 包(它被大约 50-100 个其他服务调用)来发布我的消息,大致如下所示:
def pubsub_callback(future):
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
def send_oneoff_pubsub_message(self, client=None):
if not client:
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
try:
future.exception(timeout=10)
except Exception as exc:
print("error")
future.add_done_callback(pubsub_callback)
现在在许多地方,我们正在慢慢重构以在函数之外显式创建客户端(因此我们不会创建太多客户端)。但是我仍然想重构它以在消息发布后关闭客户端。
链接问题建议在您完成客户端后 client.api.transport._channel.close()
。但是,在这种情况下,我只是在 pubsub_callback
被触发后才完成它。
我没有看到任何方式从未来获取客户端,回调 add_done_callback
不允许(正确)不允许发送参数。
有什么创造性的解决方案吗?
无论如何我都需要硬着头皮重构繁重的 pubsub 客户端,但这并不总是明确的。
更新:
查看代码,似乎这将在 future 之后成功关闭客户端:
def send_oneoff_pubsub_message(self, client=None):
if not client:
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
try:
future.exception(timeout=10)
except Exception as exc:
print("error")
future.add_done_callback(pubsub_callback)
future.result(timeout=10)
client.api.transport._channel.close()
这种方法有什么缺点吗?除了函数块直到发布(这对我来说没问题)
您的代码混合了异步操作和同步操作。调用发布后,您最多等待 10 秒以等待未来的异常。在大多数情况下,在 10 秒内,无论如何发布都会完成,所以到那时,您不妨同步调用 future.result
,甚至不必理会 add_done_callback
:
def pubsub_callback(future):
try:
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
except Exception as exc:
print("error")
def send_oneoff_pubsub_message(self, client=None):
if not client:
client_created = True
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
future = client.publish({...})
pubsub_callback(future)
if client_created:
client.api.transport._channel.close()
如果你想异步做事,你可以使用 functools.partial
:
from functools import partial
def pubsub_callback(client, future):
try:
message_id = future.result()
LOGGER.info("Successfully published %s", message_id)
except Exception as exc:
print("error")
if client:
client.api.transport._channel.close()
def send_oneoff_pubsub_message(self, client=None):
if not client:
client_created = True
client = self.get_client('pubsubpub') # Creates a pubsub publisher client
if client_created:
callback = partial(pubsub_callback, client)
else:
callback = partial(pubsub_callback, None)
future = client.publish({...})
future.add_done_callback(callback)
任何一种方法都应该允许您在所需的时间点关闭客户端。