Pubsublite 消息确认不起作用
Pubsublite message acknowledgement not working
我正在使用 Google pubsublite。具有单个分区和一些消息的小型虚拟主题。 Python 客户端库。使用回调执行标准 SubscriberCluent.subscribe
。回调将消息放入队列中。当消息从队列中取出用于消费时,它的 ack
被调用。当我想停止时,我调用 subscribe_future.cancel(); subscriber_future.result()
并丢弃队列中未使用的消息。
假设我知道该主题有 30 条消息。我在停止之前消耗了其中的 10 个。然后我在同一个订阅中重新启动一个新的 SubscriberClient
并接收消息。我希望从第 11 条消息开始,但我从第一条开始。所以珍贵的订阅者已经确认了前 10 个,但好像服务器没有收到确认。
我认为 ack 可能需要一些时间才能到达服务器。所以我等了 2 分钟才开始第二次订阅。没有帮助。
然后你想也许订阅者对象管理 ack 调用,我需要在取消之前“刷新”它们,但我发现了另一个关于那个。
我错过了什么?谢谢。
这是代码。如果您有 pubsublite 帐户,则在您填写凭据后可以执行代码。代码显示了两个问题,一个是本题的主题;另一个在
询问
# Using python 3.8
from __future__ import annotations
import logging
import pickle
import queue
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Union, Optional
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
LocationPath,
ReservationPath, SubscriptionPath,
TopicPath,
)
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials
logging.getLogger('google.cloud').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
class Account:
def __init__(self,
project_id: str,
region: str,
zone: str,
credentials: Credentials,
):
self.project_id = project_id
self.region = region
self.zone = CloudZone.parse(zone)
self.credentials = credentials
self.client = AdminClient(region=region, credentials=credentials)
def location_path(self) -> LocationPath:
return LocationPath(self.project_id, self.zone)
def reservation_path(self, name: str) -> ReservationPath:
return ReservationPath(self.project_id, self.region, name)
def topic_path(self, name: str) -> TopicPath:
return TopicPath(self.project_id, self.zone, name)
def subscription_path(self, name: str) -> SubscriptionPath:
return SubscriptionPath(self.project_id, self.zone, name)
def create_reservation(self, name: str, *, capacity: int = 32) -> None:
path = self.reservation_path(name)
reservation = GCPReservation(name=str(path),
throughput_capacity=capacity)
self.client.create_reservation(reservation)
# logger.info('reservation %s created', name)
def create_topic(self,
name: str,
*,
partition_count: int = 1,
partition_size_gib: int = 30,
reservation_name: str = 'default') -> Topic:
# A topic name can not be reused within one hour of deletion.
top_path = self.topic_path(name)
res_path = self.reservation_path(reservation_name)
topic = GCPTopic(
name=str(top_path),
partition_config=GCPTopic.PartitionConfig(count=partition_count),
retention_config=GCPTopic.RetentionConfig(
per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
reservation_config=GCPTopic.ReservationConfig(
throughput_reservation=str(res_path)))
self.client.create_topic(topic)
# logger.info('topic %s created', name)
return Topic(name, self)
def delete_topic(self, name: str) -> None:
path = self.topic_path(name)
self.client.delete_topic(path)
# logger.info('topic %s deleted', name)
def get_topic(self, name: str) -> Topic:
return Topic(name, self)
class Topic:
def __init__(self, name: str, account: Account):
self.account = account
self.name = name
self._path = self.account.topic_path(name)
def create_subscription(self,
name: str,
*,
pos: str = None) -> Subscription:
path = self.account.subscription_path(name)
if pos is None or pos == 'beginning':
starting_offset = BacklogLocation.BEGINNING
elif pos == 'end':
starting_offset = BacklogLocation.END
else:
raise ValueError(
'Argument start only accepts one of two values - "beginning" or "end"'
)
Conf = GCPSubscription.DeliveryConfig
subscription = GCPSubscription(
name=str(path),
topic=str(self._path),
delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
self.account.client.create_subscription(subscription, starting_offset)
# logger.info('subscription %s created for topic %s', name, self.name)
return Subscription(name, self)
def delete_subscription(self, name: str) -> None:
path = self.account.subscription_path(name)
self.account.client.delete_subscription(path)
# logger.info('subscription %s deleted from topic %s', name, self.name)
def get_subscription(self, name: str):
return Subscription(name, self)
@contextmanager
def get_publisher(self, **kwargs):
with Publisher(self, **kwargs) as pub:
yield pub
class Publisher:
def __init__(self, topic: Topic, *, batch_size: int = 100):
self.topic = topic
self._batch_config = {
'max_bytes': 3 * 1024 * 1024, # 3 Mb; must be None:
self._messages.put(data)
class Subscription:
def __init__(self, name: str, topic: Topic):
self.topic = topic
self.name = name
self._path = topic.account.subscription_path(name)
@contextmanager
def get_subscriber(self, *, backlog=None):
with Subscriber(self, backlog=backlog) as sub:
yield sub
class Subscriber:
def __init__(self, subscription: Subscription, backlog: int = None):
self.subscription = subscription
self._backlog = backlog or 100
self._cancel_requested: bool = None
self._messages: queue.Queue = None
self._pool: ThreadPoolExecutor = None
self._NOMORE = object()
self._subscribe_task = None
def __enter__(self):
self._pool = ThreadPoolExecutor(1).__enter__()
self._messages = queue.Queue(self._backlog)
messages = self._messages
def callback(msg: PubSubMessage):
logger.info('got %s', pickle.loads(msg.data))
messages.put(msg)
def _subscribe():
flowcontrol = FlowControlSettings(
messages_outstanding=self._backlog,
bytes_outstanding=1024 * 1024 * 10)
subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
with subscriber:
fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
logger.info('subscribe sent to gcp')
while True:
if self._cancel_requested:
fut.cancel()
fut.result()
while True:
while not messages.empty():
try:
_ = messages.get_nowait()
except queue.Empty:
break
try:
messages.put_nowait(self._NOMORE)
break
except queue.Full:
continue
break
time.sleep(0.003)
self._subscribe_task = self._pool.submit(_subscribe)
return self
def __exit__(self, *args, **kwargs):
if self._pool is not None:
if self._subscribe_task is not None:
self._cancel_requested = True
while True:
z = self._messages.get()
if z is self._NOMORE:
break
self._subscribe_task.result()
self._subscribe_task = None
self._messages = None
self._pool.__exit__(*args, **kwargs)
self._pool = None
def get(self, timeout=None):
if timeout is not None and timeout == 0:
msg = self._messages.get_nowait()
else:
msg = self._messages.get(block=True, timeout=timeout)
data = pickle.loads(msg.data)
msg.ack()
return data
def get_account() -> Account:
return Account(project_id='--fill-in-proj-id--',
region='us-central1',
zone='us-central1-a',
credentials='--fill-in-creds--')
# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test1(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
try:
with topic.get_publisher() as p:
p.put(1)
p.put(2)
p.put(3)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
t0 = time.time()
logger.info('getting the first message')
z = s.get()
t1 = time.time()
logger.info(' got the first message')
print(z)
print('getting the first msg took', t1 - t0, 'seconds')
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
def test2(account):
name = 'test-' + str(uuid.uuid4())
topic = account.create_topic(name)
N = 30
try:
with topic.get_publisher(batch_size=1) as p:
for i in range(N):
p.put(i)
sub = topic.create_subscription(name)
try:
with sub.get_subscriber() as s:
for i in range(10):
z = s.get()
assert z == i
# The following block shows that the subscriber
# resets to the first message, not as expected
# that it picks up where the last block left.
with sub.get_subscriber() as s:
for i in range(10, 20):
z = s.get()
try:
assert z == i
except AssertionError as e:
print(z, '!=', i)
return
finally:
topic.delete_subscription(name)
finally:
account.delete_topic(name)
if __name__ == '__main__':
a = get_account()
try:
a.create_reservation('default')
except AlreadyExists:
pass
test1(a)
print('')
test2(a)
我无法重现您的问题,但我认为您应该在有关使用 cloud pubsublite.
的官方文档中检查其处理方式
这是我从 Receiving messages 示例中提取和更新的代码,它按预期工作,它将从 lite-topic 获取消息并确认以避免再次获取它。如果重新运行,我只会在有数据可拉的情况下获取数据。我添加了代码,以便您检查是否有与您的代码不同的地方。
consumer.py
from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
FlowControlSettings,
SubscriptionPath,
MessageMetadata,
)
from google.cloud.pubsub_v1.types import PubsubMessage
# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
per_partition_flow_control_settings = FlowControlSettings(
messages_outstanding=1000,
bytes_outstanding=10 * 1024 * 1024,
)
def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = MessageMetadata.decode(message.message_id)
print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
message.ack()
# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=per_partition_flow_control_settings,
)
print(f"Listening for messages on {str(subscription_path)}...")
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError or KeyboardInterrupt:
streaming_pull_future.cancel()
assert streaming_pull_future.done()
我遇到你的情况的唯一方法是当我使用不同的订阅时。但是在这方面,当不同的订阅从主题中获取消息时,每个订阅都将收到相同的存储消息,如 Receiving messages from Lite subscriptions.
中所述。
考虑一下:
- 检查您的订阅传送配置。您可以使用 Create and manage Lite subscriptions 页面进行指导。
- 检查您的代码和官方示例是否以某种方式保留了相同的结构。对于我的情况,我检查了以下示例:
我找到了解决办法。在取消“订阅”未来之前,我需要睡一会儿以允许刷新确认(即发送出去)。特别是,google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS
(值 0.1)似乎是观看时间。需要多睡一会儿才能确定。
这是 google 包中的错误。 “取消”未来意味着放弃未处理的消息,而应发送已提交的确认。这个错误可能没有被注意到,因为重复的消息传递不是错误。
我正在使用 Google pubsublite。具有单个分区和一些消息的小型虚拟主题。 Python 客户端库。使用回调执行标准 SubscriberCluent.subscribe
。回调将消息放入队列中。当消息从队列中取出用于消费时,它的 ack
被调用。当我想停止时,我调用 subscribe_future.cancel(); subscriber_future.result()
并丢弃队列中未使用的消息。
假设我知道该主题有 30 条消息。我在停止之前消耗了其中的 10 个。然后我在同一个订阅中重新启动一个新的 SubscriberClient
并接收消息。我希望从第 11 条消息开始,但我从第一条开始。所以珍贵的订阅者已经确认了前 10 个,但好像服务器没有收到确认。
我认为 ack 可能需要一些时间才能到达服务器。所以我等了 2 分钟才开始第二次订阅。没有帮助。
然后你想也许订阅者对象管理 ack 调用,我需要在取消之前“刷新”它们,但我发现了另一个关于那个。
我错过了什么?谢谢。
这是代码。如果您有 pubsublite 帐户,则在您填写凭据后可以执行代码。代码显示了两个问题,一个是本题的主题;另一个在
# Using python 3.8 from __future__ import annotations import logging import pickle import queue import time import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Union, Optional from google.api_core.exceptions import AlreadyExists from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite import AdminClient, PubSubMessage from google.cloud.pubsublite import Reservation as GCPReservation from google.cloud.pubsublite import Subscription as GCPSubscription from google.cloud.pubsublite import Topic as GCPTopic from google.cloud.pubsublite.cloudpubsub import (PublisherClient, SubscriberClient) from google.cloud.pubsublite.types import (BacklogLocation, CloudZone, LocationPath, ReservationPath, SubscriptionPath, TopicPath, ) from google.cloud.pubsublite.types import FlowControlSettings from google.oauth2.service_account import Credentials logging.getLogger('google.cloud').setLevel(logging.WARNING) logger = logging.getLogger(__name__) FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') class Account: def __init__(self, project_id: str, region: str, zone: str, credentials: Credentials, ): self.project_id = project_id self.region = region self.zone = CloudZone.parse(zone) self.credentials = credentials self.client = AdminClient(region=region, credentials=credentials) def location_path(self) -> LocationPath: return LocationPath(self.project_id, self.zone) def reservation_path(self, name: str) -> ReservationPath: return ReservationPath(self.project_id, self.region, name) def topic_path(self, name: str) -> TopicPath: return TopicPath(self.project_id, self.zone, name) def subscription_path(self, name: str) -> SubscriptionPath: return SubscriptionPath(self.project_id, self.zone, name) def create_reservation(self, name: str, *, capacity: int = 32) -> None: path = self.reservation_path(name) reservation = GCPReservation(name=str(path), throughput_capacity=capacity) self.client.create_reservation(reservation) # logger.info('reservation %s created', name) def create_topic(self, name: str, *, partition_count: int = 1, partition_size_gib: int = 30, reservation_name: str = 'default') -> Topic: # A topic name can not be reused within one hour of deletion. top_path = self.topic_path(name) res_path = self.reservation_path(reservation_name) topic = GCPTopic( name=str(top_path), partition_config=GCPTopic.PartitionConfig(count=partition_count), retention_config=GCPTopic.RetentionConfig( per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024), reservation_config=GCPTopic.ReservationConfig( throughput_reservation=str(res_path))) self.client.create_topic(topic) # logger.info('topic %s created', name) return Topic(name, self) def delete_topic(self, name: str) -> None: path = self.topic_path(name) self.client.delete_topic(path) # logger.info('topic %s deleted', name) def get_topic(self, name: str) -> Topic: return Topic(name, self) class Topic: def __init__(self, name: str, account: Account): self.account = account self.name = name self._path = self.account.topic_path(name) def create_subscription(self, name: str, *, pos: str = None) -> Subscription: path = self.account.subscription_path(name) if pos is None or pos == 'beginning': starting_offset = BacklogLocation.BEGINNING elif pos == 'end': starting_offset = BacklogLocation.END else: raise ValueError( 'Argument start only accepts one of two values - "beginning" or "end"' ) Conf = GCPSubscription.DeliveryConfig subscription = GCPSubscription( name=str(path), topic=str(self._path), delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY)) self.account.client.create_subscription(subscription, starting_offset) # logger.info('subscription %s created for topic %s', name, self.name) return Subscription(name, self) def delete_subscription(self, name: str) -> None: path = self.account.subscription_path(name) self.account.client.delete_subscription(path) # logger.info('subscription %s deleted from topic %s', name, self.name) def get_subscription(self, name: str): return Subscription(name, self) @contextmanager def get_publisher(self, **kwargs): with Publisher(self, **kwargs) as pub: yield pub class Publisher: def __init__(self, topic: Topic, *, batch_size: int = 100): self.topic = topic self._batch_config = { 'max_bytes': 3 * 1024 * 1024, # 3 Mb; must be None: self._messages.put(data) class Subscription: def __init__(self, name: str, topic: Topic): self.topic = topic self.name = name self._path = topic.account.subscription_path(name) @contextmanager def get_subscriber(self, *, backlog=None): with Subscriber(self, backlog=backlog) as sub: yield sub class Subscriber: def __init__(self, subscription: Subscription, backlog: int = None): self.subscription = subscription self._backlog = backlog or 100 self._cancel_requested: bool = None self._messages: queue.Queue = None self._pool: ThreadPoolExecutor = None self._NOMORE = object() self._subscribe_task = None def __enter__(self): self._pool = ThreadPoolExecutor(1).__enter__() self._messages = queue.Queue(self._backlog) messages = self._messages def callback(msg: PubSubMessage): logger.info('got %s', pickle.loads(msg.data)) messages.put(msg) def _subscribe(): flowcontrol = FlowControlSettings( messages_outstanding=self._backlog, bytes_outstanding=1024 * 1024 * 10) subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials) with subscriber: fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol) logger.info('subscribe sent to gcp') while True: if self._cancel_requested: fut.cancel() fut.result() while True: while not messages.empty(): try: _ = messages.get_nowait() except queue.Empty: break try: messages.put_nowait(self._NOMORE) break except queue.Full: continue break time.sleep(0.003) self._subscribe_task = self._pool.submit(_subscribe) return self def __exit__(self, *args, **kwargs): if self._pool is not None: if self._subscribe_task is not None: self._cancel_requested = True while True: z = self._messages.get() if z is self._NOMORE: break self._subscribe_task.result() self._subscribe_task = None self._messages = None self._pool.__exit__(*args, **kwargs) self._pool = None def get(self, timeout=None): if timeout is not None and timeout == 0: msg = self._messages.get_nowait() else: msg = self._messages.get(block=True, timeout=timeout) data = pickle.loads(msg.data) msg.ack() return data def get_account() -> Account: return Account(project_id='--fill-in-proj-id--', region='us-central1', zone='us-central1-a', credentials='--fill-in-creds--') # This test shows that it takes extremely long to get the first messsage # in `subscribe`. def test1(account): name = 'test-' + str(uuid.uuid4()) topic = account.create_topic(name) try: with topic.get_publisher() as p: p.put(1) p.put(2) p.put(3) sub = topic.create_subscription(name) try: with sub.get_subscriber() as s: t0 = time.time() logger.info('getting the first message') z = s.get() t1 = time.time() logger.info(' got the first message') print(z) print('getting the first msg took', t1 - t0, 'seconds') finally: topic.delete_subscription(name) finally: account.delete_topic(name) def test2(account): name = 'test-' + str(uuid.uuid4()) topic = account.create_topic(name) N = 30 try: with topic.get_publisher(batch_size=1) as p: for i in range(N): p.put(i) sub = topic.create_subscription(name) try: with sub.get_subscriber() as s: for i in range(10): z = s.get() assert z == i # The following block shows that the subscriber # resets to the first message, not as expected # that it picks up where the last block left. with sub.get_subscriber() as s: for i in range(10, 20): z = s.get() try: assert z == i except AssertionError as e: print(z, '!=', i) return finally: topic.delete_subscription(name) finally: account.delete_topic(name) if __name__ == '__main__': a = get_account() try: a.create_reservation('default') except AlreadyExists: pass test1(a) print('') test2(a)
我无法重现您的问题,但我认为您应该在有关使用 cloud pubsublite.
的官方文档中检查其处理方式这是我从 Receiving messages 示例中提取和更新的代码,它按预期工作,它将从 lite-topic 获取消息并确认以避免再次获取它。如果重新运行,我只会在有数据可拉的情况下获取数据。我添加了代码,以便您检查是否有与您的代码不同的地方。
consumer.py
from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
FlowControlSettings,
SubscriptionPath,
MessageMetadata,
)
from google.cloud.pubsub_v1.types import PubsubMessage
# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
per_partition_flow_control_settings = FlowControlSettings(
messages_outstanding=1000,
bytes_outstanding=10 * 1024 * 1024,
)
def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = MessageMetadata.decode(message.message_id)
print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
message.ack()
# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=per_partition_flow_control_settings,
)
print(f"Listening for messages on {str(subscription_path)}...")
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError or KeyboardInterrupt:
streaming_pull_future.cancel()
assert streaming_pull_future.done()
我遇到你的情况的唯一方法是当我使用不同的订阅时。但是在这方面,当不同的订阅从主题中获取消息时,每个订阅都将收到相同的存储消息,如 Receiving messages from Lite subscriptions.
中所述。考虑一下:
- 检查您的订阅传送配置。您可以使用 Create and manage Lite subscriptions 页面进行指导。
- 检查您的代码和官方示例是否以某种方式保留了相同的结构。对于我的情况,我检查了以下示例:
我找到了解决办法。在取消“订阅”未来之前,我需要睡一会儿以允许刷新确认(即发送出去)。特别是,google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS
(值 0.1)似乎是观看时间。需要多睡一会儿才能确定。
这是 google 包中的错误。 “取消”未来意味着放弃未处理的消息,而应发送已提交的确认。这个错误可能没有被注意到,因为重复的消息传递不是错误。