实现一个云函数来发布到由 GCS finalize 触发的 pubsub
Implementing a cloud function to publish to pubsub triggered by GCS finalize
我一直在尝试在 Python 中编写和部署云函数。 (放弃 node.js 由于杂乱的文档和相对较快的更改速度)
它旨在向 Pub/Sub 主题发布消息,当文件完成上传到 google 云存储桶 ("finalize") 时触发。
我用来部署函数的代码是
gcloud functions deploy hello_gcs_generic --runtime python37 --trigger-resource bucketcfpubsub
我一直在尝试使用 this script provided by Google
import time
from google.cloud import pubsub_v1
project_id = "bucketcfpubsub"
topic_name = "projects/bucketcfpubsub/topics/pubsub"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
message_future.add_done_callback(callback)
print('Published message IDs:')
# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
time.sleep(60)
我在 Google Cloud Console
中收到这些错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1
ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)
按照 帖子的说明,我从 helloworld 代码示例中复制了 requirements.txt,仅包含
google-cloud-error-reporting==0.30.0
并更新了其他云功能,例如 bigquery、存储和日志记录。然后我得到了这些错误:
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1`
我也找到了[这个帖子](ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)但我真的不明白解决方案是什么,我试过用google-cloud-pubsub替换pubsub_v1 ==0.38.0 没有帮助。我得到这个错误:
Deploying function (may take a while - up to 2 minutes)...failed.
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 724, in exec_module
File "<frozen importlib._bootstrap_external>", line 860, in get_code
File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3
此外,如果代码一旦中断,这似乎不是一个可持续的解决方案Google 将 pubsub 更新到新版本?
所以我是一个非常初学者并且很迷茫,但我希望这个文档可以帮助你们帮助我。
更新:
似乎 pubsub 和 pubsub_v1 都可以使用,但不确定有什么区别。
@dustin 我做了一个 pip install -r requirements.txt 结果与你提供的相匹配。
我还注意到将函数部署为 'hello-gcs-generic' 时出错,应将其更改为 'callback'.
python 代码现在在本地运行良好,但是使用上面的代码(OP 中的第一行代码)将其部署到云中始终returns 这个错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, messa
ge=Function load error: Error: function load attempt timed out
.
您需要将 google-cloud-pubsub
添加到 requirements.txt
文件,而不是 main.py
文件。它应该是这样的:
google-cloud-error-reporting==0.30.0
google-cloud-pubsub==0.38.0
有一个 simpler Python quickstart example 可以满足您的需要。 ;-)
您引用的示例更高级。它显示了如何使用 error-handling 发布消息。高级示例中的 while(True): sleep(60)
行是为了让主线程保持活动状态,除非发出 Ctrl+C
或其等价物以从 运行 停止程序。这个 sleep
函数存在的原因是我们可以等待发布期货的回调调用完成,而不是在发布调用后立即退出程序。同样,对于您尝试学习使用 Cloud Pub/Sub 和 Cloud Functions 做的事情来说,这可能有点太复杂了。我建议避开高级示例并使用快速入门示例。
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))
print('Published messages.')
我一直在尝试在 Python 中编写和部署云函数。 (放弃 node.js 由于杂乱的文档和相对较快的更改速度)
它旨在向 Pub/Sub 主题发布消息,当文件完成上传到 google 云存储桶 ("finalize") 时触发。
我用来部署函数的代码是
gcloud functions deploy hello_gcs_generic --runtime python37 --trigger-resource bucketcfpubsub
我一直在尝试使用 this script provided by Google
import time
from google.cloud import pubsub_v1
project_id = "bucketcfpubsub"
topic_name = "projects/bucketcfpubsub/topics/pubsub"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
message_future.add_done_callback(callback)
print('Published message IDs:')
# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
time.sleep(60)
我在 Google Cloud Console
中收到这些错误ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1
ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)
按照
google-cloud-error-reporting==0.30.0
并更新了其他云功能,例如 bigquery、存储和日志记录。然后我得到了这些错误:
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1`
我也找到了[这个帖子](ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)但我真的不明白解决方案是什么,我试过用google-cloud-pubsub替换pubsub_v1 ==0.38.0 没有帮助。我得到这个错误:
Deploying function (may take a while - up to 2 minutes)...failed.
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 724, in exec_module
File "<frozen importlib._bootstrap_external>", line 860, in get_code
File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3
此外,如果代码一旦中断,这似乎不是一个可持续的解决方案Google 将 pubsub 更新到新版本?
所以我是一个非常初学者并且很迷茫,但我希望这个文档可以帮助你们帮助我。
更新:
似乎 pubsub 和 pubsub_v1 都可以使用,但不确定有什么区别。
@dustin 我做了一个 pip install -r requirements.txt 结果与你提供的相匹配。 我还注意到将函数部署为 'hello-gcs-generic' 时出错,应将其更改为 'callback'.
python 代码现在在本地运行良好,但是使用上面的代码(OP 中的第一行代码)将其部署到云中始终returns 这个错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, messa
ge=Function load error: Error: function load attempt timed out
.
您需要将 google-cloud-pubsub
添加到 requirements.txt
文件,而不是 main.py
文件。它应该是这样的:
google-cloud-error-reporting==0.30.0
google-cloud-pubsub==0.38.0
有一个 simpler Python quickstart example 可以满足您的需要。 ;-)
您引用的示例更高级。它显示了如何使用 error-handling 发布消息。高级示例中的 while(True): sleep(60)
行是为了让主线程保持活动状态,除非发出 Ctrl+C
或其等价物以从 运行 停止程序。这个 sleep
函数存在的原因是我们可以等待发布期货的回调调用完成,而不是在发布调用后立即退出程序。同样,对于您尝试学习使用 Cloud Pub/Sub 和 Cloud Functions 做的事情来说,这可能有点太复杂了。我建议避开高级示例并使用快速入门示例。
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))
print('Published messages.')