Pubsublite 消息确认不起作用

Pubsublite message acknowledgement not working

我正在使用 Google pubsublite。具有单个分区和一些消息的小型虚拟主题。 Python 客户端库。使用回调执行标准 SubscriberCluent.subscribe。回调将消息放入队列中。当消息从队列中取出用于消费时,它的 ack 被调用。当我想停止时,我调用 subscribe_future.cancel(); subscriber_future.result() 并丢弃队列中未使用的消息。

假设我知道该主题有 30 条消息。我在停止之前消耗了其中的 10 个。然后我在同一个订阅中重新启动一个新的 SubscriberClient 并接收消息。我希望从第 11 条消息开始,但我从第一条开始。所以珍贵的订阅者已经确认了前 10 个,但好像服务器没有收到确认。

我认为 ack 可能需要一些时间才能到达服务器。所以我等了 2 分钟才开始第二次订阅。没有帮助。

然后你想也许订阅者对象管理 ack 调用,我需要在取消之前“刷新”它们,但我发现了另一个关于那个。

我错过了什么?谢谢。

这是代码。如果您有 pubsublite 帐户,则在您填写凭据后可以执行代码。代码显示了两个问题,一个是本题的主题;另一个在

询问

    # Using python 3.8
    
    from __future__ import annotations
    
    import logging
    import pickle
    import queue
    import time
    import uuid
    from concurrent.futures import ThreadPoolExecutor
    from contextlib import contextmanager
    from typing import Union, Optional
    
    from google.api_core.exceptions import AlreadyExists
    from google.cloud.pubsub_v1.types import BatchSettings
    from google.cloud.pubsublite import AdminClient, PubSubMessage
    from google.cloud.pubsublite import Reservation as GCPReservation
    from google.cloud.pubsublite import Subscription as GCPSubscription
    from google.cloud.pubsublite import Topic as GCPTopic
    from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
                                                     SubscriberClient)
    from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
                                               LocationPath,
                                               ReservationPath, SubscriptionPath,
                                               TopicPath,
                                               )
    from google.cloud.pubsublite.types import FlowControlSettings
    from google.oauth2.service_account import Credentials
    
    
    logging.getLogger('google.cloud').setLevel(logging.WARNING)
    
    logger = logging.getLogger(__name__)
    
    FORMAT = '[%(asctime)s.%(msecs)03d %(name)s]  %(message)s'
    logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
    
    
    class Account:
        def __init__(self,
                     project_id: str,
                     region: str,
                     zone: str,
                     credentials: Credentials,
                     ):
            self.project_id = project_id
            self.region = region
            self.zone = CloudZone.parse(zone)
            self.credentials = credentials
            self.client = AdminClient(region=region, credentials=credentials)
    
        def location_path(self) -> LocationPath:
            return LocationPath(self.project_id, self.zone)
    
        def reservation_path(self, name: str) -> ReservationPath:
            return ReservationPath(self.project_id, self.region, name)
    
        def topic_path(self, name: str) -> TopicPath:
            return TopicPath(self.project_id, self.zone, name)
    
        def subscription_path(self, name: str) -> SubscriptionPath:
            return SubscriptionPath(self.project_id, self.zone, name)
    
        def create_reservation(self, name: str, *, capacity: int = 32) -> None:
            path = self.reservation_path(name)
            reservation = GCPReservation(name=str(path),
                                         throughput_capacity=capacity)
            self.client.create_reservation(reservation)
            # logger.info('reservation %s created', name)
    
        def create_topic(self,
                         name: str,
                         *,
                         partition_count: int = 1,
                         partition_size_gib: int = 30,
                         reservation_name: str = 'default') -> Topic:
            # A topic name can not be reused within one hour of deletion.
            top_path = self.topic_path(name)
            res_path = self.reservation_path(reservation_name)
    
            topic = GCPTopic(
                name=str(top_path),
                partition_config=GCPTopic.PartitionConfig(count=partition_count),
                retention_config=GCPTopic.RetentionConfig(
                    per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
                reservation_config=GCPTopic.ReservationConfig(
                    throughput_reservation=str(res_path)))
    
            self.client.create_topic(topic)
            # logger.info('topic %s created', name)
    
            return Topic(name, self)
    
        def delete_topic(self, name: str) -> None:
            path = self.topic_path(name)
            self.client.delete_topic(path)
            # logger.info('topic %s deleted', name)
    
        def get_topic(self, name: str) -> Topic:
            return Topic(name, self)
    
    
    class Topic:
    
        def __init__(self, name: str, account: Account):
            self.account = account
            self.name = name
            self._path = self.account.topic_path(name)
    
        def create_subscription(self,
                                name: str,
                                *,
                                pos: str = None) -> Subscription:
            path = self.account.subscription_path(name)
    
            if pos is None or pos == 'beginning':
                starting_offset = BacklogLocation.BEGINNING
            elif pos == 'end':
                starting_offset = BacklogLocation.END
            else:
                raise ValueError(
                    'Argument start only accepts one of two values - "beginning" or "end"'
                )
    
            Conf = GCPSubscription.DeliveryConfig
            subscription = GCPSubscription(
                name=str(path),
                topic=str(self._path),
                delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
    
            self.account.client.create_subscription(subscription, starting_offset)
            # logger.info('subscription %s created for topic %s', name, self.name)
    
            return Subscription(name, self)
    
        def delete_subscription(self, name: str) -> None:
            path = self.account.subscription_path(name)
            self.account.client.delete_subscription(path)
            # logger.info('subscription %s deleted from topic %s', name, self.name)
    
        def get_subscription(self, name: str):
            return Subscription(name, self)
    
        @contextmanager
        def get_publisher(self, **kwargs):
            with Publisher(self, **kwargs) as pub:
                yield pub
    
    
    class Publisher:
        def __init__(self, topic: Topic, *, batch_size: int = 100):
            self.topic = topic
            self._batch_config = {
                    'max_bytes': 3 * 1024 * 1024,  # 3 Mb; must be  None:
            self._messages.put(data)
    
    
    class Subscription:
    
        def __init__(self, name: str, topic: Topic):
            self.topic = topic
            self.name = name
            self._path = topic.account.subscription_path(name)
    
        @contextmanager
        def get_subscriber(self, *, backlog=None):
            with Subscriber(self, backlog=backlog) as sub:
                yield sub
    
    
    class Subscriber:
        def __init__(self, subscription: Subscription, backlog: int = None):
            self.subscription = subscription
            self._backlog = backlog or 100
            self._cancel_requested: bool = None
            self._messages: queue.Queue = None
            self._pool: ThreadPoolExecutor = None
            self._NOMORE = object()
            self._subscribe_task = None
    
        def __enter__(self):
            self._pool = ThreadPoolExecutor(1).__enter__()
            self._messages = queue.Queue(self._backlog)
            messages = self._messages
    
            def callback(msg: PubSubMessage):
                logger.info('got %s', pickle.loads(msg.data))
                messages.put(msg)
    
            def _subscribe():
                flowcontrol = FlowControlSettings(
                        messages_outstanding=self._backlog,
                        bytes_outstanding=1024 * 1024 * 10)
    
                subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
                with subscriber:
                    fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
                    logger.info('subscribe sent to gcp')
    
                    while True:
                        if self._cancel_requested:
                            fut.cancel()
                            fut.result()
                            while True:
                                while not messages.empty():
                                    try:
                                        _ = messages.get_nowait()
                                    except queue.Empty:
                                        break
                                try:
                                    messages.put_nowait(self._NOMORE)
                                    break
                                except queue.Full:
                                    continue
                            break
                        time.sleep(0.003)
    
            self._subscribe_task = self._pool.submit(_subscribe)
            return self
    
        def __exit__(self, *args, **kwargs):
            if self._pool is not None:
                if self._subscribe_task is not None:
                    self._cancel_requested = True
                    while True:
                        z = self._messages.get()
                        if z is self._NOMORE:
                            break
                    self._subscribe_task.result()
                    self._subscribe_task = None
                    self._messages = None
                self._pool.__exit__(*args, **kwargs)
                self._pool = None
    
        def get(self, timeout=None):
            if timeout is not None and timeout == 0:
                msg = self._messages.get_nowait()
            else:
                msg = self._messages.get(block=True, timeout=timeout)
            data = pickle.loads(msg.data)
            msg.ack()
            return data
    
    
    def get_account() -> Account:
        return Account(project_id='--fill-in-proj-id--',
                       region='us-central1',
                       zone='us-central1-a',
                       credentials='--fill-in-creds--')
    
    
    # This test shows that it takes extremely long to get the first messsage
    # in `subscribe`.
    def test1(account):
        name = 'test-' + str(uuid.uuid4())
        topic = account.create_topic(name)
        try:
            with topic.get_publisher() as p:
                p.put(1)
                p.put(2)
                p.put(3)
    
            sub = topic.create_subscription(name)
            try:
                with sub.get_subscriber() as s:
                    t0 = time.time()
                    logger.info('getting the first message')
                    z = s.get()
                    t1 = time.time()
                    logger.info('  got the first message')
                    print(z)
                print('getting the first msg took', t1 - t0, 'seconds')
            finally:
                topic.delete_subscription(name)
        finally:
            account.delete_topic(name)
    
    
    def test2(account):
        name = 'test-' + str(uuid.uuid4())
        topic = account.create_topic(name)
        N = 30
        try:
            with topic.get_publisher(batch_size=1) as p:
                for i in range(N):
                    p.put(i)
    
            sub = topic.create_subscription(name)
            try:
                with sub.get_subscriber() as s:
                    for i in range(10):
                        z = s.get()
                        assert z == i
    
                # The following block shows that the subscriber
                # resets to the first message, not as expected
                # that it picks up where the last block left.
    
                with sub.get_subscriber() as s:
                    for i in range(10, 20):
                        z = s.get()
                        try:
                            assert z == i
                        except AssertionError as e:
                            print(z, '!=', i)
                            return
            finally:
                topic.delete_subscription(name)
        finally:
            account.delete_topic(name)
    
    
    if __name__ == '__main__':
        a = get_account()
        try:
            a.create_reservation('default')
        except AlreadyExists:
            pass
    
        test1(a)
        print('')
        test2(a)

我无法重现您的问题,但我认为您应该在有关使用 cloud pubsublite.

的官方文档中检查其处理方式

这是我从 Receiving messages 示例中提取和更新的代码,它按预期工作,它将从 lite-topic 获取消息并确认以避免再次获取它。如果重新运行,我只会在有数据可拉的情况下获取数据。我添加了代码,以便您检查是否有与您的代码不同的地方。

consumer.py

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
    MessageMetadata, 
)
from google.cloud.pubsub_v1.types import PubsubMessage

# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=1000,
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

我遇到你的情况的唯一方法是当我使用不同的订阅时。但是在这方面,当不同的订阅从主题中获取消息时,每个订阅都将收到相同的存储消息,如 Receiving messages from Lite subscriptions.

中所述。

考虑一下:

我找到了解决办法。在取消“订阅”未来之前,我需要睡一会儿以允许刷新确认(即发送出去)。特别是,google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS(值 0.1)似乎是观看时间。需要多睡一会儿才能确定。

这是 google 包中的错误。 “取消”未来意味着放弃未处理的消息,而应发送已提交的确认。这个错误可能没有被注意到,因为重复的消息传递不是错误。