Google Cloud PubSub:并非 sending/receiving 来自 Cloud Functions 的所有消息
Google Cloud PubSub: Not sending/receiving all messages from Cloud Functions
总结:我的客户端代码通过向Pub/Sub主题发布消息触发861后台Google云功能。每个 Cloud Function 执行一项任务,将结果上传到 Google 存储,并将消息发布到客户端代码正在侦听的另一个 Pub/Sub 主题。尽管执行了所有 Cloud Functions,但客户端代码并未收到所有消息(通过 Google 存储中的结果数验证)。
服务器端:我有一个后台 Google 云功能,每次向 TRIGGER Pub/Sub 主题发布消息时都会触发该功能。消息数据的自定义属性充当函数参数,具体取决于函数执行特定任务的方式。然后它将结果上传到 Google 存储中的存储桶,并向 RESULTS Pub/Sub 主题(与用于触发此功能的主题不同)发布一条消息(带有任务 ID 和执行时间详细信息)。
客户端:我需要执行 861 项不同的任务,这需要使用 861 项略有不同的输入来调用 Cloud Function。这些任务很相似,Cloud Function 执行它们需要 20 秒到 2 分钟(中位数约为 1 分钟)。我为此创建了一个 python 脚本,我从 Google 云 Shell(或本地计算机 shell)运行。客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息,同时触发尽可能多的 Cloud Functions,每个都在范围 [0, 860] 中传递一个唯一的 taskID。然后,客户端 python 脚本以 "synchronous pull" 的方式轮询 RESULTS Pub/Sub 主题以获取任何消息。 Cloud Functions 在执行任务后将消息发布到具有唯一任务 ID 和时间详细信息的 RESULTS Pub/Sub 主题。客户端使用这个唯一的 taskID 来识别消息来自哪个任务。它还有助于识别被丢弃的重复消息。
基本步骤:
- 客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息(每条消息都具有唯一的 taskID)并等待来自 Cloud Function 的结果消息。
- 调用了 861 个不同的 Cloud Functions,每个 Cloud Functions 执行一个任务,将结果上传到 Google 存储,并将消息(带有任务 ID 和执行时间详细信息)发布到 RESULTS Pub/Sub 主题。
- 客户端同步抓取所有消息,并将任务标记为完成。
问题:
当客户端轮询来自 RESULTS Pub/Sub 主题的消息时,我没有收到所有 taskID 的消息。我确信 Cloud Function 已被正确调用和执行(我在 Google 存储桶中有 861 个结果)。我重复了很多次,每次都会发生。奇怪的是,丢失的 taskID 的数量每次都在变化,并且不同的 taskID 在不同的 运行s 之间丢失。我还跟踪收到的重复 taskID 的数量。在 table 中给出了 5 个独立的 运行s.
中接收到、丢失和重复的唯一任务 ID 的数量。
SN # of Tasks Received Missing Repeated
1 861 860 1 25
2 861 840 21 3
3 861 851 10 1
4 861 837 24 3
5 861 856 5 1
我不确定这个问题可能是从哪里产生的。鉴于数字的随机性和丢失的 taskID,我怀疑 Pub/Sub at-least-once 交付逻辑中存在一些错误。如果在 Cloud Function 中,我睡了几秒钟而不是执行任务,例如使用 time.sleep(5),那么一切正常(我在客户端收到所有 861 taskID)。
重现此问题的代码。
在下文中,main.py
和 requirements.txt
被部署为 Google Cloud Function,而 client.py
是客户端代码。 运行 具有 100 个并发任务的客户端 python client.py 100
重复了 5 次。每次丢失不同数量的 taskID。
requirements.txt
google-cloud-pubsub
main.py
"""
This file is deployed as Google Cloud Function. This function starts,
sleeps for some seconds and pulishes back the taskID.
Deloyment:
gcloud functions deploy gcf_run --runtime python37 --trigger-topic <TRIGGER_TOPIC> --memory=128MB --timeout=300s
"""
import time
from random import randint
from google.cloud import pubsub_v1
# Global variables
project_id = "<Your Google Cloud Project ID>" # Your Google Cloud Project ID
topic_name = "<RESULTS_TOPIC>" # Your Pub/Sub topic name
def gcf_run(data, context):
"""Background Cloud Function to be triggered by Pub/Sub.
Args:
data (dict): The dictionary with data specific to this type of event.
context (google.cloud.functions.Context): The Cloud Functions event
metadata.
"""
# Message should contain taskID (in addition to the data)
if 'attributes' in data:
attributes = data['attributes']
if 'taskID' in attributes:
taskID = attributes['taskID']
else:
print('taskID missing!')
return
else:
print('attributes missing!')
return
# Sleep for a random time beteen 30 seconds to 1.5 minutes
print("Start execution for {}".format(taskID))
sleep_time = randint(30, 90) # sleep for this many seconds
time.sleep(sleep_time) # sleep for few seconds
# Marks this task complete by publishing a message to Pub/Sub.
data = u'Message number {}'.format(taskID)
data = data.encode('utf-8') # Data must be a bytestring
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
publisher.publish(topic_path, data=data, taskID=taskID)
return
client.py
"""
The client code creates the given number of tasks and publishes to Pub/Sub,
which in turn calls the Google Cloud Functions concurrently.
Run:
python client.py 100
"""
from __future__ import print_function
import sys
import time
from google.cloud import pubsub_v1
# Global variables
project_id = "<Google Cloud Project ID>" # Google Cloud Project ID
topic_name = "<TRIGGER_TOPIC>" # Pub/Sub topic name to publish
subscription_name = "<subscriber to RESULTS_TOPIC>" # Pub/Sub subscription name
num_experiments = 5 # number of times to repeat the experiment
time_between_exp = 120.0 # number of seconds between experiments
# Initialize the Publisher (to send commands that invoke Cloud Functions)
# as well as Subscriber (to receive results written by the Cloud Functions)
# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
class Task:
"""
A task which will execute the Cloud Function once.
Attributes:
taskID (int) : A unique number given to a task (starting from 0).
complete (boolean) : Flag to indicate if this task has completed.
"""
def __init__(self, taskID):
self.taskID = taskID
self.complete = False
def start(self):
"""
Start the execution of Cloud Function by publishing a message with
taskID to the Pub/Sub topic.
"""
data = u'Message number {}'.format(self.taskID)
data = data.encode('utf-8') # Data must be a bytestring
publisher.publish(topic_path, data=data, taskID=str(self.taskID))
def end(self):
"""
Mark the end of this task.
Returns (boolean):
True if normal, False if task was already marked before.
"""
# If this task was not complete, mark it as completed
if not self.complete:
self.complete = True
return True
return False
# [END of Task Class]
def createTasks(num_tasks):
"""
Create a list of tasks and return it.
Args:
num_tasks (int) : Number of tasks (Cloud Function calls)
Returns (list):
A list of tasks.
"""
all_tasks = list()
for taskID in range(0, num_tasks):
all_tasks.append(Task(taskID=taskID))
return all_tasks
def receiveResults(all_tasks):
"""
Receives messages from the Pub/Sub subscription. I am using a blocking
Synchronous Pull instead of the usual asynchronous pull with a callback
funtion as I rely on a polling pattern to retrieve messages.
See: https://cloud.google.com/pubsub/docs/pull
Args:
all_tasks (list) : List of all tasks.
"""
num_tasks = len(all_tasks)
total_msg_received = 0 # track the number of messages received
NUM_MESSAGES = 10 # maximum number of messages to pull synchronously
TIMEOUT = 600.0 # number of seconds to wait for response (10 minutes)
# Keep track of elapsed time and exit if > TIMEOUT
__MyFuncStartTime = time.time()
__MyFuncElapsedTime = 0.0
print('Listening for messages on {}'.format(subscription_path))
while (total_msg_received < num_tasks) and (__MyFuncElapsedTime < TIMEOUT):
# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path,
max_messages=NUM_MESSAGES, timeout=TIMEOUT, retry=None)
ack_ids = []
# Keep track of all received messages
for received_message in response.received_messages:
if received_message.message.attributes:
attributes = received_message.message.attributes
taskID = int(attributes['taskID'])
if all_tasks[taskID].end():
# increment count only if task completes the first time
# if False, we received a duplicate message
total_msg_received += 1
# print("Received taskID = {} ({} of {})".format(
# taskID, total_msg_received, num_tasks))
# else:
# print('REPEATED: taskID {} was already marked'.format(taskID))
else:
print('attributes missing!')
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
if ack_ids:
subscriber.acknowledge(subscription_path, ack_ids)
time.sleep(0.2) # Wait 200 ms before polling again
__MyFuncElapsedTime = time.time() - __MyFuncStartTime
# print("{} s elapsed. Listening again.".format(__MyFuncElapsedTime))
# if total_msg_received != num_tasks, function exit due to timeout
if total_msg_received != num_tasks:
print("WARNING: *** Receiver timed out! ***")
print("Received {} messages out of {}. Done.".format(
total_msg_received, num_tasks))
def main(num_tasks):
"""
Main execution point of the program
"""
for experiment_num in range(1, num_experiments + 1):
print("Starting experiment {} of {} with {} tasks".format(
experiment_num, num_experiments, num_tasks))
# Create all tasks and start them
all_tasks = createTasks(num_tasks)
for task in all_tasks: # Start all tasks
task.start()
print("Published {} taskIDs".format(num_tasks))
receiveResults(all_tasks) # Receive message from Pub/Sub subscription
print("Waiting {} seconds\n\n".format(time_between_exp))
time.sleep(time_between_exp) # sleep between experiments
if __name__ == "__main__":
if(len(sys.argv) != 2):
print("usage: python client.py <num_tasks>")
print(" num_tasks: Number of concurrent Cloud Function calls")
sys.exit()
num_tasks = int(sys.argv[1])
main(num_tasks)
在您的云函数中,在这一行中:
publisher.publish(topic_path, data=data, taskID=taskID)
你不是在等待 publisher.publish returns 的未来。这意味着您不能保证当您从 gcf_run
函数结束时发布到主题上确实发生了,但是 TRIGGER 主题云函数订阅上的消息无论如何都会被确认。
相反,要等到云功能终止发布,这应该是:
publisher.publish(topic_path, data=data, taskID=taskID).result()
您还应避免在每次函数调用时启动和拆除发布者客户端,而是将客户端作为全局变量。
总结:我的客户端代码通过向Pub/Sub主题发布消息触发861后台Google云功能。每个 Cloud Function 执行一项任务,将结果上传到 Google 存储,并将消息发布到客户端代码正在侦听的另一个 Pub/Sub 主题。尽管执行了所有 Cloud Functions,但客户端代码并未收到所有消息(通过 Google 存储中的结果数验证)。
服务器端:我有一个后台 Google 云功能,每次向 TRIGGER Pub/Sub 主题发布消息时都会触发该功能。消息数据的自定义属性充当函数参数,具体取决于函数执行特定任务的方式。然后它将结果上传到 Google 存储中的存储桶,并向 RESULTS Pub/Sub 主题(与用于触发此功能的主题不同)发布一条消息(带有任务 ID 和执行时间详细信息)。
客户端:我需要执行 861 项不同的任务,这需要使用 861 项略有不同的输入来调用 Cloud Function。这些任务很相似,Cloud Function 执行它们需要 20 秒到 2 分钟(中位数约为 1 分钟)。我为此创建了一个 python 脚本,我从 Google 云 Shell(或本地计算机 shell)运行。客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息,同时触发尽可能多的 Cloud Functions,每个都在范围 [0, 860] 中传递一个唯一的 taskID。然后,客户端 python 脚本以 "synchronous pull" 的方式轮询 RESULTS Pub/Sub 主题以获取任何消息。 Cloud Functions 在执行任务后将消息发布到具有唯一任务 ID 和时间详细信息的 RESULTS Pub/Sub 主题。客户端使用这个唯一的 taskID 来识别消息来自哪个任务。它还有助于识别被丢弃的重复消息。
基本步骤:
- 客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息(每条消息都具有唯一的 taskID)并等待来自 Cloud Function 的结果消息。
- 调用了 861 个不同的 Cloud Functions,每个 Cloud Functions 执行一个任务,将结果上传到 Google 存储,并将消息(带有任务 ID 和执行时间详细信息)发布到 RESULTS Pub/Sub 主题。
- 客户端同步抓取所有消息,并将任务标记为完成。
问题: 当客户端轮询来自 RESULTS Pub/Sub 主题的消息时,我没有收到所有 taskID 的消息。我确信 Cloud Function 已被正确调用和执行(我在 Google 存储桶中有 861 个结果)。我重复了很多次,每次都会发生。奇怪的是,丢失的 taskID 的数量每次都在变化,并且不同的 taskID 在不同的 运行s 之间丢失。我还跟踪收到的重复 taskID 的数量。在 table 中给出了 5 个独立的 运行s.
中接收到、丢失和重复的唯一任务 ID 的数量。SN # of Tasks Received Missing Repeated
1 861 860 1 25
2 861 840 21 3
3 861 851 10 1
4 861 837 24 3
5 861 856 5 1
我不确定这个问题可能是从哪里产生的。鉴于数字的随机性和丢失的 taskID,我怀疑 Pub/Sub at-least-once 交付逻辑中存在一些错误。如果在 Cloud Function 中,我睡了几秒钟而不是执行任务,例如使用 time.sleep(5),那么一切正常(我在客户端收到所有 861 taskID)。
重现此问题的代码。
在下文中,main.py
和 requirements.txt
被部署为 Google Cloud Function,而 client.py
是客户端代码。 运行 具有 100 个并发任务的客户端 python client.py 100
重复了 5 次。每次丢失不同数量的 taskID。
requirements.txt
google-cloud-pubsub
main.py
"""
This file is deployed as Google Cloud Function. This function starts,
sleeps for some seconds and pulishes back the taskID.
Deloyment:
gcloud functions deploy gcf_run --runtime python37 --trigger-topic <TRIGGER_TOPIC> --memory=128MB --timeout=300s
"""
import time
from random import randint
from google.cloud import pubsub_v1
# Global variables
project_id = "<Your Google Cloud Project ID>" # Your Google Cloud Project ID
topic_name = "<RESULTS_TOPIC>" # Your Pub/Sub topic name
def gcf_run(data, context):
"""Background Cloud Function to be triggered by Pub/Sub.
Args:
data (dict): The dictionary with data specific to this type of event.
context (google.cloud.functions.Context): The Cloud Functions event
metadata.
"""
# Message should contain taskID (in addition to the data)
if 'attributes' in data:
attributes = data['attributes']
if 'taskID' in attributes:
taskID = attributes['taskID']
else:
print('taskID missing!')
return
else:
print('attributes missing!')
return
# Sleep for a random time beteen 30 seconds to 1.5 minutes
print("Start execution for {}".format(taskID))
sleep_time = randint(30, 90) # sleep for this many seconds
time.sleep(sleep_time) # sleep for few seconds
# Marks this task complete by publishing a message to Pub/Sub.
data = u'Message number {}'.format(taskID)
data = data.encode('utf-8') # Data must be a bytestring
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
publisher.publish(topic_path, data=data, taskID=taskID)
return
client.py
"""
The client code creates the given number of tasks and publishes to Pub/Sub,
which in turn calls the Google Cloud Functions concurrently.
Run:
python client.py 100
"""
from __future__ import print_function
import sys
import time
from google.cloud import pubsub_v1
# Global variables
project_id = "<Google Cloud Project ID>" # Google Cloud Project ID
topic_name = "<TRIGGER_TOPIC>" # Pub/Sub topic name to publish
subscription_name = "<subscriber to RESULTS_TOPIC>" # Pub/Sub subscription name
num_experiments = 5 # number of times to repeat the experiment
time_between_exp = 120.0 # number of seconds between experiments
# Initialize the Publisher (to send commands that invoke Cloud Functions)
# as well as Subscriber (to receive results written by the Cloud Functions)
# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
class Task:
"""
A task which will execute the Cloud Function once.
Attributes:
taskID (int) : A unique number given to a task (starting from 0).
complete (boolean) : Flag to indicate if this task has completed.
"""
def __init__(self, taskID):
self.taskID = taskID
self.complete = False
def start(self):
"""
Start the execution of Cloud Function by publishing a message with
taskID to the Pub/Sub topic.
"""
data = u'Message number {}'.format(self.taskID)
data = data.encode('utf-8') # Data must be a bytestring
publisher.publish(topic_path, data=data, taskID=str(self.taskID))
def end(self):
"""
Mark the end of this task.
Returns (boolean):
True if normal, False if task was already marked before.
"""
# If this task was not complete, mark it as completed
if not self.complete:
self.complete = True
return True
return False
# [END of Task Class]
def createTasks(num_tasks):
"""
Create a list of tasks and return it.
Args:
num_tasks (int) : Number of tasks (Cloud Function calls)
Returns (list):
A list of tasks.
"""
all_tasks = list()
for taskID in range(0, num_tasks):
all_tasks.append(Task(taskID=taskID))
return all_tasks
def receiveResults(all_tasks):
"""
Receives messages from the Pub/Sub subscription. I am using a blocking
Synchronous Pull instead of the usual asynchronous pull with a callback
funtion as I rely on a polling pattern to retrieve messages.
See: https://cloud.google.com/pubsub/docs/pull
Args:
all_tasks (list) : List of all tasks.
"""
num_tasks = len(all_tasks)
total_msg_received = 0 # track the number of messages received
NUM_MESSAGES = 10 # maximum number of messages to pull synchronously
TIMEOUT = 600.0 # number of seconds to wait for response (10 minutes)
# Keep track of elapsed time and exit if > TIMEOUT
__MyFuncStartTime = time.time()
__MyFuncElapsedTime = 0.0
print('Listening for messages on {}'.format(subscription_path))
while (total_msg_received < num_tasks) and (__MyFuncElapsedTime < TIMEOUT):
# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path,
max_messages=NUM_MESSAGES, timeout=TIMEOUT, retry=None)
ack_ids = []
# Keep track of all received messages
for received_message in response.received_messages:
if received_message.message.attributes:
attributes = received_message.message.attributes
taskID = int(attributes['taskID'])
if all_tasks[taskID].end():
# increment count only if task completes the first time
# if False, we received a duplicate message
total_msg_received += 1
# print("Received taskID = {} ({} of {})".format(
# taskID, total_msg_received, num_tasks))
# else:
# print('REPEATED: taskID {} was already marked'.format(taskID))
else:
print('attributes missing!')
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
if ack_ids:
subscriber.acknowledge(subscription_path, ack_ids)
time.sleep(0.2) # Wait 200 ms before polling again
__MyFuncElapsedTime = time.time() - __MyFuncStartTime
# print("{} s elapsed. Listening again.".format(__MyFuncElapsedTime))
# if total_msg_received != num_tasks, function exit due to timeout
if total_msg_received != num_tasks:
print("WARNING: *** Receiver timed out! ***")
print("Received {} messages out of {}. Done.".format(
total_msg_received, num_tasks))
def main(num_tasks):
"""
Main execution point of the program
"""
for experiment_num in range(1, num_experiments + 1):
print("Starting experiment {} of {} with {} tasks".format(
experiment_num, num_experiments, num_tasks))
# Create all tasks and start them
all_tasks = createTasks(num_tasks)
for task in all_tasks: # Start all tasks
task.start()
print("Published {} taskIDs".format(num_tasks))
receiveResults(all_tasks) # Receive message from Pub/Sub subscription
print("Waiting {} seconds\n\n".format(time_between_exp))
time.sleep(time_between_exp) # sleep between experiments
if __name__ == "__main__":
if(len(sys.argv) != 2):
print("usage: python client.py <num_tasks>")
print(" num_tasks: Number of concurrent Cloud Function calls")
sys.exit()
num_tasks = int(sys.argv[1])
main(num_tasks)
在您的云函数中,在这一行中:
publisher.publish(topic_path, data=data, taskID=taskID)
你不是在等待 publisher.publish returns 的未来。这意味着您不能保证当您从 gcf_run
函数结束时发布到主题上确实发生了,但是 TRIGGER 主题云函数订阅上的消息无论如何都会被确认。
相反,要等到云功能终止发布,这应该是:
publisher.publish(topic_path, data=data, taskID=taskID).result()
您还应避免在每次函数调用时启动和拆除发布者客户端,而是将客户端作为全局变量。