Pubsublite 订阅第一条消息非常慢
Pubsublite subscribe extremely slow for first message
我正在使用 GCP pubsublite,带有一个分区的天真的主题,一些消息。 Python 客户端库。执行标准订阅,订阅客户端,subscribe
方法与回调。从调用方法到收到第一条消息,大约需要 30 秒。后续消息很快,因为它们已经在缓存中。
关于起病时间极长的问题:是预料之中的吗?还是有一些常见的嫌疑人?
感谢指点
编辑:代码粘贴在下面。它在编辑凭据后在 Docker 中运行。我的打印输出是
<pre><code>[2022-02-22 14:22:36.162 __main__] subscribe started
[2022-02-22 14:23:09.187 __main__] got 1
[2022-02-22 14:23:09.189 __main__] got 2
[2022-02-22 14:23:09.189 __main__] got 3
<pre><code># Using python 3.8
from __future__ import annotations
import logging
import pickle
import queue
import uuid
from contextlib import contextmanager
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
LocationPath,
ReservationPath, SubscriptionPath,
TopicPath,
)
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials
logging.getLogger('google.cloud').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
class Account:
def __init__(self,
project_id: str,
region: str,
zone: str,
credentials: Credentials,
):
self.project_id = project_id
self.region = region
self.zone = CloudZone.parse(zone)
self.credentials = credentials
self.client = AdminClient(region=region, credentials=credentials)
def location_path(self) -> LocationPath:
return LocationPath(self.project_id, self.zone)
def reservation_path(self, name: str) -> ReservationPath:
return ReservationPath(self.project_id, self.region, name)
def topic_path(self, name: str) -> TopicPath:
return TopicPath(self.project_id, self.zone, name)
def subscription_path(self, name: str) -> SubscriptionPath:
return SubscriptionPath(self.project_id, self.zone, name)
def create_reservation(self, name: str, *, capacity: int = 32) -> None:
path = self.reservation_path(name)
reservation = GCPReservation(name=str(path),
throughput_capacity=capacity)
self.client.create_reservation(reservation)
# logger.info('reservation %s created', name)
def create_topic(self,
name: str,
*,
partition_count: int = 1,
partition_size_gib: int = 30,
reservation_name: str = 'default') -> Topic:
# A topic name can not be reused within one hour of deletion.
top_path = self.topic_path(name)
res_path = self.reservation_path(reservation_name)
topic = GCPTopic(
name=str(top_path),
partition_config=GCPTopic.PartitionConfig(count=partition_count),
retention_config=GCPTopic.RetentionConfig(
per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
reservation_config=GCPTopic.ReservationConfig(
throughput_reservation=str(res_path)))
self.client.create_topic(topic)
# logger.info('topic %s created', name)
return Topic(name, self)
def delete_topic(self, name: str) -> None:
path = self.topic_path(name)
self.client.delete_topic(path)
# logger.info('topic %s deleted', name)
def get_topic(self, name: str) -> Topic:
return Topic(name, self)
class Topic:
def __init__(self, name: str, account: Account):
self.account = account
self.name = name
self._path = self.account.topic_path(name)
def create_subscription(self, name: str) -> Subscription:
path = self.account.subscription_path(name)
Conf = GCPSubscription.DeliveryConfig
subscription = GCPSubscription(
name=str(path),
topic=str(self._path),
delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
self.account.client.create_subscription(subscription, BacklogLocation.BEGINNING)
# logger.info('subscription %s created for topic %s', name, self.name)
return Subscription(name, self)
def delete_subscription(self, name: str) -> None:
path = self.account.subscription_path(name)
self.account.client.delete_subscription(path)
# logger.info('subscription %s deleted from topic %s', name, self.name)
def get_subscription(self, name: str):
return Subscription(name, self)
@contextmanager
def get_publisher(self, **kwargs):
with Publisher(self, **kwargs) as pub:
yield pub
class Publisher:
def __init__(self, topic: Topic):
self.topic = topic
self._publisher = None
def __enter__(self):
self._publisher = PublisherClient(credentials=self.topic.account.credentials)
self._publisher = self._publisher.__enter__()
return self
def __exit__(self, *args, **kwargs):
self._publisher.__exit__(*args, **kwargs)
def put(self, data) -> None:
fut = self._publisher.publish(self.topic._path, pickle.dumps(data))
fut.result()
class Subscription:
def __init__(self, name: str, topic: Topic):
self.topic = topic
self.name = name
self._path = topic.account.subscription_path(name)
@contextmanager
def get_subscriber(self):
with Subscriber(self) as sub:
yield sub
class Subscriber:
def __init__(self, subscription: Subscription):
self.subscription = subscription
self._messages = queue.Queue()
self._subscriber = None
self._subscribe_task = None
def __enter__(self):
def callback(msg: PubSubMessage):
logger.info('got %s', pickle.loads(msg.data))
self._messages.put(msg)
flowcontrol = FlowControlSettings(
messages_outstanding=1000, bytes_outstanding=10*1024*1024)
self._subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
self._subscriber.__enter__()
self._subscribe_task = self._subscriber.subscribe(self.subscription._path, callback, flowcontrol)
logger.info('subscribe started')
return self
def __exit__(self, *args, **kwargs):
self._subscribe_task.cancel()
self._subscribe_task.result()
self._subscriber.__exit__(*args, **kwargs)
def get(self):
msg = self._messages.get()
msg.ack()
return pickle.loads(msg.data)
def get_account() -> Account:
return Account(project_id='--fill-in--',
region='us-central1',
zone='us-central1-a',
credentials='--fill-in--')
# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
try:
with topic.get_publisher() as p:
p.put(1)
p.put(2)
p.put(3)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
z = s.get()
z = s.get()
z = s.get()
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
if __name__ == '__main__':
a = get_account()
try:
a.create_reservation('default')
except AlreadyExists:
pass
test(a)
Google 支持确认缓慢是预期的行为。他们计划将其添加到 public 文档中:
“订阅者应该是长期存在的。如果订阅没有当前订阅者,创建第一个订阅者最多可能需要 1 分钟才能开始接收消息。”
我正在使用 GCP pubsublite,带有一个分区的天真的主题,一些消息。 Python 客户端库。执行标准订阅,订阅客户端,subscribe
方法与回调。从调用方法到收到第一条消息,大约需要 30 秒。后续消息很快,因为它们已经在缓存中。
关于起病时间极长的问题:是预料之中的吗?还是有一些常见的嫌疑人?
感谢指点
编辑:代码粘贴在下面。它在编辑凭据后在 Docker 中运行。我的打印输出是
<pre><code>[2022-02-22 14:22:36.162 __main__] subscribe started
[2022-02-22 14:23:09.187 __main__] got 1
[2022-02-22 14:23:09.189 __main__] got 2
[2022-02-22 14:23:09.189 __main__] got 3
<pre><code># Using python 3.8
from __future__ import annotations
import logging
import pickle
import queue
import uuid
from contextlib import contextmanager
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
LocationPath,
ReservationPath, SubscriptionPath,
TopicPath,
)
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials
logging.getLogger('google.cloud').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
class Account:
def __init__(self,
project_id: str,
region: str,
zone: str,
credentials: Credentials,
):
self.project_id = project_id
self.region = region
self.zone = CloudZone.parse(zone)
self.credentials = credentials
self.client = AdminClient(region=region, credentials=credentials)
def location_path(self) -> LocationPath:
return LocationPath(self.project_id, self.zone)
def reservation_path(self, name: str) -> ReservationPath:
return ReservationPath(self.project_id, self.region, name)
def topic_path(self, name: str) -> TopicPath:
return TopicPath(self.project_id, self.zone, name)
def subscription_path(self, name: str) -> SubscriptionPath:
return SubscriptionPath(self.project_id, self.zone, name)
def create_reservation(self, name: str, *, capacity: int = 32) -> None:
path = self.reservation_path(name)
reservation = GCPReservation(name=str(path),
throughput_capacity=capacity)
self.client.create_reservation(reservation)
# logger.info('reservation %s created', name)
def create_topic(self,
name: str,
*,
partition_count: int = 1,
partition_size_gib: int = 30,
reservation_name: str = 'default') -> Topic:
# A topic name can not be reused within one hour of deletion.
top_path = self.topic_path(name)
res_path = self.reservation_path(reservation_name)
topic = GCPTopic(
name=str(top_path),
partition_config=GCPTopic.PartitionConfig(count=partition_count),
retention_config=GCPTopic.RetentionConfig(
per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
reservation_config=GCPTopic.ReservationConfig(
throughput_reservation=str(res_path)))
self.client.create_topic(topic)
# logger.info('topic %s created', name)
return Topic(name, self)
def delete_topic(self, name: str) -> None:
path = self.topic_path(name)
self.client.delete_topic(path)
# logger.info('topic %s deleted', name)
def get_topic(self, name: str) -> Topic:
return Topic(name, self)
class Topic:
def __init__(self, name: str, account: Account):
self.account = account
self.name = name
self._path = self.account.topic_path(name)
def create_subscription(self, name: str) -> Subscription:
path = self.account.subscription_path(name)
Conf = GCPSubscription.DeliveryConfig
subscription = GCPSubscription(
name=str(path),
topic=str(self._path),
delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
self.account.client.create_subscription(subscription, BacklogLocation.BEGINNING)
# logger.info('subscription %s created for topic %s', name, self.name)
return Subscription(name, self)
def delete_subscription(self, name: str) -> None:
path = self.account.subscription_path(name)
self.account.client.delete_subscription(path)
# logger.info('subscription %s deleted from topic %s', name, self.name)
def get_subscription(self, name: str):
return Subscription(name, self)
@contextmanager
def get_publisher(self, **kwargs):
with Publisher(self, **kwargs) as pub:
yield pub
class Publisher:
def __init__(self, topic: Topic):
self.topic = topic
self._publisher = None
def __enter__(self):
self._publisher = PublisherClient(credentials=self.topic.account.credentials)
self._publisher = self._publisher.__enter__()
return self
def __exit__(self, *args, **kwargs):
self._publisher.__exit__(*args, **kwargs)
def put(self, data) -> None:
fut = self._publisher.publish(self.topic._path, pickle.dumps(data))
fut.result()
class Subscription:
def __init__(self, name: str, topic: Topic):
self.topic = topic
self.name = name
self._path = topic.account.subscription_path(name)
@contextmanager
def get_subscriber(self):
with Subscriber(self) as sub:
yield sub
class Subscriber:
def __init__(self, subscription: Subscription):
self.subscription = subscription
self._messages = queue.Queue()
self._subscriber = None
self._subscribe_task = None
def __enter__(self):
def callback(msg: PubSubMessage):
logger.info('got %s', pickle.loads(msg.data))
self._messages.put(msg)
flowcontrol = FlowControlSettings(
messages_outstanding=1000, bytes_outstanding=10*1024*1024)
self._subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
self._subscriber.__enter__()
self._subscribe_task = self._subscriber.subscribe(self.subscription._path, callback, flowcontrol)
logger.info('subscribe started')
return self
def __exit__(self, *args, **kwargs):
self._subscribe_task.cancel()
self._subscribe_task.result()
self._subscriber.__exit__(*args, **kwargs)
def get(self):
msg = self._messages.get()
msg.ack()
return pickle.loads(msg.data)
def get_account() -> Account:
return Account(project_id='--fill-in--',
region='us-central1',
zone='us-central1-a',
credentials='--fill-in--')
# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
try:
with topic.get_publisher() as p:
p.put(1)
p.put(2)
p.put(3)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
z = s.get()
z = s.get()
z = s.get()
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
if __name__ == '__main__':
a = get_account()
try:
a.create_reservation('default')
except AlreadyExists:
pass
test(a)
Google 支持确认缓慢是预期的行为。他们计划将其添加到 public 文档中:
“订阅者应该是长期存在的。如果订阅没有当前订阅者,创建第一个订阅者最多可能需要 1 分钟才能开始接收消息。”